# Building an ETL Pipeline
---

### Step 0: Install the required python packages

In [1]:
pip install --upgrade sodapy

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install --upgrade db-dtypes

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install --upgrade pyarrow

Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install --upgrade google-cloud-bigquery

Note: you may need to restart the kernel to use updated packages.


### Step 1: Setup your NYC Open Data variables (ACTION REQUIRED HERE)

In [5]:
# import libraries
import pandas as pd
import numpy as np
from sodapy import Socrata
from google.cloud import bigquery
from google.oauth2 import service_account

In [6]:
# setup the host name for the API endpoint (the https:// part will be added automatically)
data_url = 'data.cityofnewyork.us'

In [7]:
# setup the data set at the API endpoint (Motor Vehicle Collisions - Crashes)
# For example: https://data.cityofnewyork.us/resource/h9gi-nx95.json
# would give us 'h9gi-nx95'
data_set = 'h9gi-nx95'

In [8]:
# Setup the App Token
app_token = 'cqU7HERSdQikK26O6yJuZLNAY'

In [9]:
#Setup your Socrata client that connects python to NYC Open Data
# create the client that points to the API endpoint
nyc_open_data_client = Socrata(data_url, app_token, timeout = 200)
print(f"nyc open data client name is: {nyc_open_data_client}")
print(f"nyc open data client data type is: {type(nyc_open_data_client)}")

nyc open data client name is: <sodapy.socrata.Socrata object at 0x7fc9f17b7f40>
nyc open data client data type is: <class 'sodapy.socrata.Socrata'>


### Step 2: Setup your Google BigQuery variables (ACTION REQUIRED HERE)

If you did not create a key path in class on 3/30/22 (which created a json file on your computer), you must create one to continue:
1. Open BigQuery
2. On the top-left, click on the Navigation Menu
3. In the Navigation Menu, go to "IAM & Admin" -> "Sercive Accounts"
4. On the top of the page, click on "Create Service Account"
5. Account name: cis9440-spring2022
6. Click create and continue
7. Set Role to Owner
8. Click Continue
9. Click Done
10. In the new row for your Service Account, click on the 3 dots in the "Action" column. Select "Manage Keys"
11. Click "Add Key", then "Create New Key". Select the "JSON" radio button and click "Create"
12. In the next cell, set key_path to the exact file path of your new JSON file. For example, it will look like r'C:\Users\Downloads\cis9440-324315-70048a5e1138.json'

In [10]:
# CHANGE THIS TO YOUR FILE PATH
key_path = r'/Users/xiaozhongli/Downloads/CIS 9440/cis9440-final-project-39c452b60b6b.json'
#key_path = '/content/project1-342802-64b522eec5e5.json'


In [11]:
# run this cell without changing anything to setup your credentials
credentials = service_account.Credentials.from_service_account_file(key_path,
                                                                    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
bigquery_client = bigquery.Client(credentials = credentials,
                                 project = credentials.project_id)

print(f"bigquery client name is: {bigquery_client}")
print(f"bigquery client data type is: {type(bigquery_client)}")

bigquery client name is: <google.cloud.bigquery.client.Client object at 0x7fc9f17cc3a0>
bigquery client data type is: <class 'google.cloud.bigquery.client.Client'>


Now, you need to create your dataset id:
1. Go to bigquery
2. Inside the "Explorer" window, click on the 3 dots to the right of your cis9440 project called "View Actions"
3. Select "Create dataset"
4. Leave the Project ID as it is, name your Dataset ID etl_dataset
5. Expand your cis9440 project with the triangle on its left-hand side so you can see your new etl_dataset dataset
6. On the right of your etl_dataset, click the 3 dots for "View Actions" -> "Open"
7. You should now see the "Dataset info". Copy the entire "Dataset ID" and paste it in the variable below

In [12]:
dataset_id = 'cis9440-final-project.etl_dataset'   # PASTE THIS DATASET ID FROM ABOVE STEPS

dataset_id = dataset_id.replace(':', '.')
print(f"your dataset_id is: {dataset_id}")

your dataset_id is: cis9440-final-project.etl_dataset


### Step 3: Extract data

1. connect to NYC Open Data with API Key
2. pull specific dataset as a pandas dataframe
3. Look at shape of extracted data

#### sodapy client.get parameters
1. select
2. where
3. order
4. limit
5. group

In [13]:
# Get the total number of records in our the entire data set
total_record_count = nyc_open_data_client.get(data_set, select = "COUNT(*)")
print(f"total records in {data_set}: {total_record_count}")

total records in h9gi-nx95: [{'COUNT': '1889526'}]


In [14]:
# Get the total number of records in our target data set
target_record_count = nyc_open_data_client.get(data_set,
                                               where = "crash_date>='2021-01-01'",
                                               select = "COUNT(*)")
print(f"target records in {data_set}: {target_record_count}")

target records in h9gi-nx95: [{'COUNT': '144729'}]


In [15]:
# Now, loop through target data set to pull all rows in chunks (we cannot pull all rows at once)
# AGAIN, UPDATE WHERE FILTER INSIDE BELOW FUNCTION

def pull_data_in_chunks(target_record_count):
    
    # measure time this function takes
    import time
    start_time = time.time()

    start = 0             # start at 0
    chunk_size = 2000     # fetch 2000 rows at a time
    results = []          # empty out our result list
    record_count = target_record_count
    

    while True:

        # fetch the set of records starting at 'start'
        results.extend(nyc_open_data_client.get(data_set,
                                                where = "crash_date>='2021-01-01'",
                                                offset = start,
                                                limit = chunk_size))

        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached record_count), exit loop

        if (start > int(record_count[0]["COUNT"])):
            break

    # convert the list into a pandas data frame
    data = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {data.shape}")
    return data

data = pull_data_in_chunks(target_record_count)

function took 61.1 seconds
the shape of your dataframe is: (144729, 29)


In [16]:
data.columns

Index(['crash_date', 'crash_time', 'latitude', 'longitude', 'location',
       'on_street_name', 'number_of_persons_injured',
       'number_of_persons_killed', 'number_of_pedestrians_injured',
       'number_of_pedestrians_killed', 'number_of_cyclist_injured',
       'number_of_cyclist_killed', 'number_of_motorist_injured',
       'number_of_motorist_killed', 'contributing_factor_vehicle_1',
       'contributing_factor_vehicle_2', 'collision_id', 'vehicle_type_code1',
       'vehicle_type_code2', 'borough', 'zip_code', 'off_street_name',
       'cross_street_name', 'contributing_factor_vehicle_3',
       'vehicle_type_code_3', 'contributing_factor_vehicle_4',
       'contributing_factor_vehicle_5', 'vehicle_type_code_4',
       'vehicle_type_code_5'],
      dtype='object')

### Step 4: Concatenate multiple columns into one single column

In [17]:
data['contributing_factors'] = data[['contributing_factor_vehicle_1', 'contributing_factor_vehicle_2', 'contributing_factor_vehicle_3', 'contributing_factor_vehicle_4', 'contributing_factor_vehicle_5']].apply(lambda x: ', '.join(x.dropna()),axis=1)


In [18]:
data["contributing_factors"][0:10]

0                             Unspecified, Unspecified
1                                Following Too Closely
2                                          Fell Asleep
3                       Driver Inattention/Distraction
4                            Unsafe Speed, Unspecified
5                             Unspecified, Unspecified
6    Pedestrian/Bicyclist/Other Pedestrian Error/Co...
7                     Other Vehicular, Other Vehicular
8                     Passing Too Closely, Unspecified
9                             Unspecified, Unspecified
Name: contributing_factors, dtype: object

In [19]:
data['vehicle_types'] = data[['vehicle_type_code1', 'vehicle_type_code2', 'vehicle_type_code_3', 'vehicle_type_code_4', 'vehicle_type_code_5']].apply(lambda x: ', '.join(x.dropna()),axis=1)


In [20]:
data["vehicle_types"][0:10]

0                                  Sedan, Sedan
1                                         Sedan
2                                         Sedan
3                                         Sedan
4                                  Sedan, Sedan
5     Station Wagon/Sport Utility Vehicle, Taxi
6                                         Sedan
7    Sedan, Station Wagon/Sport Utility Vehicle
8                                         Sedan
9                                  Sedan, Sedan
Name: vehicle_types, dtype: object

### Step 5: Data Profiling

1. Distinct values per column
2. Null values per column
3. Summary statistics per numeric column

In [21]:
# what are the columns in our dataframe?
data.columns

Index(['crash_date', 'crash_time', 'latitude', 'longitude', 'location',
       'on_street_name', 'number_of_persons_injured',
       'number_of_persons_killed', 'number_of_pedestrians_injured',
       'number_of_pedestrians_killed', 'number_of_cyclist_injured',
       'number_of_cyclist_killed', 'number_of_motorist_injured',
       'number_of_motorist_killed', 'contributing_factor_vehicle_1',
       'contributing_factor_vehicle_2', 'collision_id', 'vehicle_type_code1',
       'vehicle_type_code2', 'borough', 'zip_code', 'off_street_name',
       'cross_street_name', 'contributing_factor_vehicle_3',
       'vehicle_type_code_3', 'contributing_factor_vehicle_4',
       'contributing_factor_vehicle_5', 'vehicle_type_code_4',
       'vehicle_type_code_5', 'contributing_factors', 'vehicle_types'],
      dtype='object')

In [22]:
# create and run a function to ceate data profiling dataframe

def create_data_profiling_df(data):
    
    # create an empty dataframe to gather information about each column
    data_profiling_df = pd.DataFrame(columns = ["column_name",
                                                "column_type",
                                                "unique_values",
                                                "duplicate_values",
                                                "null_values",
                                                "non_null_values",
                                                "percent_null"])

    # loop through each column to add rows to the data_profiling_df dataframe
    for column in data.columns:

        info_dict = {}

        try:
            info_dict["column_name"] = column
            info_dict["column_type"] = data[column].dtypes
            info_dict["unique_values"] = len(data[column].unique())
            info_dict["duplicate_values"] = (data[column].shape[0] - data[column].isna().sum()) - len(data[column].unique())
            info_dict["null_values"] = data[column].isna().sum()
            info_dict["non_null_values"] = data[column].shape[0] - data[column].isna().sum()
            info_dict["percent_null"] = round((data[column].isna().sum()) / (data[column].shape[0]), 3)

        except:
            print(f"unable to read column: {column}, you may want to drop this column")

        data_profiling_df = data_profiling_df.append(info_dict, ignore_index=True)

    data_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                                  ascending = [False, False],
                                  inplace=True)
    
    return data_profiling_df

data_profiling_df = create_data_profiling_df(data)

unable to read column: location, you may want to drop this column


In [23]:
# ACTION REQUIRED
# If any of the above columns were unable to be read by your function, you may want to drop them
# To drop a column, update the column name in the line below and run this cell
data.drop(["location"], axis = 1, inplace = True)

In [24]:
# view your data profiling dataframe
data_profiling_df

Unnamed: 0,column_name,column_type,unique_values,duplicate_values,null_values,non_null_values,percent_null
16,collision_id,object,144711.0,18.0,0.0,144729.0,0.0
2,latitude,object,56595.0,76394.0,11740.0,132989.0,0.081
3,longitude,object,46070.0,86919.0,11740.0,132989.0,0.081
22,cross_street_name,object,34531.0,4620.0,105578.0,39151.0,0.729
5,on_street_name,object,7326.0,98250.0,39153.0,105576.0,0.271
21,off_street_name,object,5554.0,61398.0,77777.0,66952.0,0.537
30,vehicle_types,object,3198.0,141531.0,0.0,144729.0,0.0
29,contributing_factors,object,1595.0,143134.0,0.0,144729.0,0.0
1,crash_time,object,1440.0,143289.0,0.0,144729.0,0.0
18,vehicle_type_code2,object,497.0,95380.0,48852.0,95877.0,0.338


### Step 6: Data Cleansing

1. drop unneeded columns
2. drop duplicate rows
3. check for outliers

In [25]:
# Run this to look at a list of your columns
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 144729 entries, 0 to 144728
Data columns (total 30 columns):
 #   Column                         Non-Null Count   Dtype 
---  ------                         --------------   ----- 
 0   crash_date                     144729 non-null  object
 1   crash_time                     144729 non-null  object
 2   latitude                       132989 non-null  object
 3   longitude                      132989 non-null  object
 4   on_street_name                 105576 non-null  object
 5   number_of_persons_injured      144728 non-null  object
 6   number_of_persons_killed       144729 non-null  object
 7   number_of_pedestrians_injured  144729 non-null  object
 8   number_of_pedestrians_killed   144729 non-null  object
 9   number_of_cyclist_injured      144729 non-null  object
 10  number_of_cyclist_killed       144729 non-null  object
 11  number_of_motorist_injured     144729 non-null  object
 12  number_of_motorist_killed      144729 non-nu

In [26]:
# ACTION REQUIRED
# edit the drop_columns list below to include all the columns you would like to drop
# then, run this cell to drop columns

drop_columns = ["crash_time",
               "on_street_name",
               "cross_street_name",
               "off_street_name"]

for column in drop_columns:
    try:
        data.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {data.columns}")

columns left in dataframe: Index(['crash_date', 'latitude', 'longitude', 'number_of_persons_injured',
       'number_of_persons_killed', 'number_of_pedestrians_injured',
       'number_of_pedestrians_killed', 'number_of_cyclist_injured',
       'number_of_cyclist_killed', 'number_of_motorist_injured',
       'number_of_motorist_killed', 'contributing_factor_vehicle_1',
       'contributing_factor_vehicle_2', 'collision_id', 'vehicle_type_code1',
       'vehicle_type_code2', 'borough', 'zip_code',
       'contributing_factor_vehicle_3', 'vehicle_type_code_3',
       'contributing_factor_vehicle_4', 'contributing_factor_vehicle_5',
       'vehicle_type_code_4', 'vehicle_type_code_5', 'contributing_factors',
       'vehicle_types'],
      dtype='object')


In [27]:
# find number of duplicate rows

print(f"number of duplicate rows: {len(data[data.duplicated()])}")

number of duplicate rows: 18


In [28]:
# drop duplicate rows based on entire row
data = data.drop_duplicates(keep = 'first')

# Or, based on a subset of rows, uncomment below and adjust accordingly
## data = data.drop_duplicates(subset = ["subset column"], keep = 'first')
## data = data.drop_duplicates(subset = ["subset column 1", "subset column 2"], keep = 'first')

print(f"number of rows after duplicates dropped: {len(data)}")

number of rows after duplicates dropped: 144711


### Step 7: Create Location Dimension

In [29]:
# first, copy the entire table
location_dim = data.copy()

In [30]:
location_dim.columns

Index(['crash_date', 'latitude', 'longitude', 'number_of_persons_injured',
       'number_of_persons_killed', 'number_of_pedestrians_injured',
       'number_of_pedestrians_killed', 'number_of_cyclist_injured',
       'number_of_cyclist_killed', 'number_of_motorist_injured',
       'number_of_motorist_killed', 'contributing_factor_vehicle_1',
       'contributing_factor_vehicle_2', 'collision_id', 'vehicle_type_code1',
       'vehicle_type_code2', 'borough', 'zip_code',
       'contributing_factor_vehicle_3', 'vehicle_type_code_3',
       'contributing_factor_vehicle_4', 'contributing_factor_vehicle_5',
       'vehicle_type_code_4', 'vehicle_type_code_5', 'contributing_factors',
       'vehicle_types'],
      dtype='object')

In [31]:
# second, subset for only the wanted columns in the dimension
location_dim = location_dim[["longitude",
                             "latitude",
                             "borough",
                             "zip_code"]]

In [32]:
location_dim

Unnamed: 0,longitude,latitude,borough,zip_code
0,-73.8243100,40.8249320,,
1,-73.9359200,40.7953100,,
2,,,,
3,,,,
4,-73.8302840,40.7428500,,
...,...,...,...,...
144724,0.0000000,0.0000000,BRONX,10459
144725,-73.73029,40.711876,QUEENS,11429
144726,-73.96057,40.5778,,
144727,-73.92966,40.765373,QUEENS,11106


In [33]:
# third, drop duplicate rows in dimension
location_dim = location_dim.drop_duplicates(subset = ["longitude","latitude"], keep = 'first')
location_dim = location_dim.reset_index(drop = True)
location_dim

Unnamed: 0,longitude,latitude,borough,zip_code
0,-73.8243100,40.8249320,,
1,-73.9359200,40.7953100,,
2,,,,
3,-73.8302840,40.7428500,,
4,-73.9786400,40.6379100,BROOKLYN,11218
...,...,...,...,...
72224,-73.880646,40.761833,QUEENS,11369
72225,-73.923134,40.828846,BRONX,10452
72226,-73.90615,40.641212,BROOKLYN,11236
72227,-73.90594,40.807285,,


In [34]:
# fourth, add location_id as a surrogate key
location_dim.insert(0, 'location_id', range(1000, 1000 + len(location_dim)))
location_dim.head()

Unnamed: 0,location_id,longitude,latitude,borough,zip_code
0,1000,-73.82431,40.824932,,
1,1001,-73.93592,40.79531,,
2,1002,,,,
3,1003,-73.830284,40.74285,,
4,1004,-73.97864,40.63791,BROOKLYN,11218.0


In [35]:
# fifth, add the location_id to the data table
data = data.merge(location_dim[['longitude', 'latitude', 'location_id']],
                  left_on = ['longitude', 'latitude'],
                  right_on = ['longitude', 'latitude'],
                  how = 'left')

data.head(2)

Unnamed: 0,crash_date,latitude,longitude,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,number_of_motorist_injured,...,zip_code,contributing_factor_vehicle_3,vehicle_type_code_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,vehicle_type_code_4,vehicle_type_code_5,contributing_factors,vehicle_types,location_id
0,2021-01-01T00:00:00.000,40.824932,-73.82431,0,0,0,0,0,0,0,...,,,,,,,,"Unspecified, Unspecified","Sedan, Sedan",1000
1,2021-01-01T00:00:00.000,40.79531,-73.93592,1,0,1,0,0,0,0,...,,,,,,,,Following Too Closely,Sedan,1001


### Step 8: Create Contributing Factor Dimension

In [36]:
# first, copy the entire table
contributingFactor_dim = data.copy()

In [37]:
contributingFactor_dim.columns

Index(['crash_date', 'latitude', 'longitude', 'number_of_persons_injured',
       'number_of_persons_killed', 'number_of_pedestrians_injured',
       'number_of_pedestrians_killed', 'number_of_cyclist_injured',
       'number_of_cyclist_killed', 'number_of_motorist_injured',
       'number_of_motorist_killed', 'contributing_factor_vehicle_1',
       'contributing_factor_vehicle_2', 'collision_id', 'vehicle_type_code1',
       'vehicle_type_code2', 'borough', 'zip_code',
       'contributing_factor_vehicle_3', 'vehicle_type_code_3',
       'contributing_factor_vehicle_4', 'contributing_factor_vehicle_5',
       'vehicle_type_code_4', 'vehicle_type_code_5', 'contributing_factors',
       'vehicle_types', 'location_id'],
      dtype='object')

In [38]:
# second, subset for only the wanted columns in the dimension
contributingFactor_dim = contributingFactor_dim[["contributing_factor_vehicle_1", "contributing_factors"]]

In [39]:
# third, drop duplicate rows in dimension
contributingFactor_dim = contributingFactor_dim.drop_duplicates(subset = ["contributing_factors"], keep = 'first')
contributingFactor_dim = contributingFactor_dim.reset_index(drop = True)
contributingFactor_dim.head()


Unnamed: 0,contributing_factor_vehicle_1,contributing_factors
0,Unspecified,"Unspecified, Unspecified"
1,Following Too Closely,Following Too Closely
2,Fell Asleep,Fell Asleep
3,Driver Inattention/Distraction,Driver Inattention/Distraction
4,Unsafe Speed,"Unsafe Speed, Unspecified"


In [40]:
# fourth, add location_id as a surrogate key
contributingFactor_dim.insert(0, 'contributingFactor_id', range(2000, 2000 + len(contributingFactor_dim)))
contributingFactor_dim.head()

Unnamed: 0,contributingFactor_id,contributing_factor_vehicle_1,contributing_factors
0,2000,Unspecified,"Unspecified, Unspecified"
1,2001,Following Too Closely,Following Too Closely
2,2002,Fell Asleep,Fell Asleep
3,2003,Driver Inattention/Distraction,Driver Inattention/Distraction
4,2004,Unsafe Speed,"Unsafe Speed, Unspecified"


In [41]:
# fifth, add the location_id to the Fact table
data = data.merge(contributingFactor_dim[['contributing_factors',
                                 'contributingFactor_id']],
                  left_on = ['contributing_factors'],
                  right_on = ['contributing_factors'],
                  how = 'left')

data.head()

Unnamed: 0,crash_date,latitude,longitude,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,number_of_motorist_injured,...,contributing_factor_vehicle_3,vehicle_type_code_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,vehicle_type_code_4,vehicle_type_code_5,contributing_factors,vehicle_types,location_id,contributingFactor_id
0,2021-01-01T00:00:00.000,40.824932,-73.82431,0,0,0,0,0,0,0,...,,,,,,,"Unspecified, Unspecified","Sedan, Sedan",1000,2000
1,2021-01-01T00:00:00.000,40.79531,-73.93592,1,0,1,0,0,0,0,...,,,,,,,Following Too Closely,Sedan,1001,2001
2,2021-01-01T00:00:00.000,,,0,0,0,0,0,0,0,...,,,,,,,Fell Asleep,Sedan,1002,2002
3,2021-01-01T00:00:00.000,,,1,0,0,0,0,0,1,...,,,,,,,Driver Inattention/Distraction,Sedan,1002,2003
4,2021-01-01T00:00:00.000,40.74285,-73.830284,1,0,0,0,0,0,1,...,,,,,,,"Unsafe Speed, Unspecified","Sedan, Sedan",1003,2004


### Step 9: Create Vehicle Type Dimension


In [42]:
# first, copy the entire table
vehicle_type_dim = data.copy()

In [43]:
vehicle_type_dim.columns

Index(['crash_date', 'latitude', 'longitude', 'number_of_persons_injured',
       'number_of_persons_killed', 'number_of_pedestrians_injured',
       'number_of_pedestrians_killed', 'number_of_cyclist_injured',
       'number_of_cyclist_killed', 'number_of_motorist_injured',
       'number_of_motorist_killed', 'contributing_factor_vehicle_1',
       'contributing_factor_vehicle_2', 'collision_id', 'vehicle_type_code1',
       'vehicle_type_code2', 'borough', 'zip_code',
       'contributing_factor_vehicle_3', 'vehicle_type_code_3',
       'contributing_factor_vehicle_4', 'contributing_factor_vehicle_5',
       'vehicle_type_code_4', 'vehicle_type_code_5', 'contributing_factors',
       'vehicle_types', 'location_id', 'contributingFactor_id'],
      dtype='object')

In [44]:
# second, subset for only the wanted columns in the dimension
vehicle_type_dim = vehicle_type_dim[["vehicle_types"]]

In [45]:
# third, drop duplicate rows in dimension
vehicle_type_dim = vehicle_type_dim.drop_duplicates(subset = ["vehicle_types"], keep = 'first')
vehicle_type_dim = vehicle_type_dim.reset_index(drop = True)
vehicle_type_dim.head()


Unnamed: 0,vehicle_types
0,"Sedan, Sedan"
1,Sedan
2,"Station Wagon/Sport Utility Vehicle, Taxi"
3,"Sedan, Station Wagon/Sport Utility Vehicle"
4,Station Wagon/Sport Utility Vehicle


In [46]:
# fourth, add location_id as a surrogate key
vehicle_type_dim.insert(0, 'vehicletype_id', range(3000, 3000 + len(vehicle_type_dim)))
vehicle_type_dim.head()

Unnamed: 0,vehicletype_id,vehicle_types
0,3000,"Sedan, Sedan"
1,3001,Sedan
2,3002,"Station Wagon/Sport Utility Vehicle, Taxi"
3,3003,"Sedan, Station Wagon/Sport Utility Vehicle"
4,3004,Station Wagon/Sport Utility Vehicle


In [47]:
# fifth, add the location_id to the Fact table
data = data.merge(vehicle_type_dim[['vehicle_types',
                                 'vehicletype_id']],
                  left_on = ['vehicle_types'],
                  right_on = ['vehicle_types'],
                  how = 'left')

data.head(2)

Unnamed: 0,crash_date,latitude,longitude,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,number_of_motorist_injured,...,vehicle_type_code_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,vehicle_type_code_4,vehicle_type_code_5,contributing_factors,vehicle_types,location_id,contributingFactor_id,vehicletype_id
0,2021-01-01T00:00:00.000,40.824932,-73.82431,0,0,0,0,0,0,0,...,,,,,,"Unspecified, Unspecified","Sedan, Sedan",1000,2000,3000
1,2021-01-01T00:00:00.000,40.79531,-73.93592,1,0,1,0,0,0,0,...,,,,,,Following Too Closely,Sedan,1001,2001,3001


### Step 10: Create Date Dimension

In [48]:
## ACTION REQUIRED: update the start and end date at the bottom of the sql_query variable to fit your needs

sql_query = """
            SELECT
              CONCAT (FORMAT_DATE("%Y",d),FORMAT_DATE("%m",d),FORMAT_DATE("%d",d)) as date_id,
              d AS full_date,
              FORMAT_DATE('%w', d) AS week_day,
              FORMAT_DATE('%A', d) AS day_name,
              FORMAT_DATE('%B', d) as month_name,
              FORMAT_DATE('%Q', d) as fiscal_qtr,
              FORMAT_DATE('%Y', d) AS year,
            FROM (
              SELECT
                *
              FROM
                UNNEST(GENERATE_DATE_ARRAY('2021-01-01', '2023-01-01', INTERVAL 1 DAY)) AS d )
            """

# store extracted data in new dataframe
date_dim = bigquery_client.query(sql_query).to_dataframe()

# validate that > 0 rows have been extracted and return dataframe
if len(date_dim) > 0:
    print(f"date dimension created successfully, shape of dimension: {date_dim.shape}")
else:
    print("date dimension FAILED")

date dimension created successfully, shape of dimension: (731, 7)


In [49]:
# create date_id column in the Fact Table
data['date_id'] = data['crash_date'].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))

In [50]:
date_dim.head()

Unnamed: 0,date_id,full_date,week_day,day_name,month_name,fiscal_qtr,year
0,20210101,2021-01-01,5,Friday,January,1,2021
1,20210102,2021-01-02,6,Saturday,January,1,2021
2,20210103,2021-01-03,0,Sunday,January,1,2021
3,20210104,2021-01-04,1,Monday,January,1,2021
4,20210105,2021-01-05,2,Tuesday,January,1,2021


### Step 11: Creating Fact(s)

In [51]:
# Creating Accident Fact Table

# creating a copy of the data table
fact_table = data.copy()

# create measures
#fact_table["days_open"] = pd.to_datetime(fact_table['closed_date']) - pd.to_datetime(fact_table['created_date'])
#fact_table["days_open"] = fact_table["days_open"].apply(lambda x: x.days)

#fact_table["number_of_complaints"] = 1

In [52]:
# histogram of days_open
#fact_table["days_open"].hist(bins = 5)

In [53]:
fact_table.columns

Index(['crash_date', 'latitude', 'longitude', 'number_of_persons_injured',
       'number_of_persons_killed', 'number_of_pedestrians_injured',
       'number_of_pedestrians_killed', 'number_of_cyclist_injured',
       'number_of_cyclist_killed', 'number_of_motorist_injured',
       'number_of_motorist_killed', 'contributing_factor_vehicle_1',
       'contributing_factor_vehicle_2', 'collision_id', 'vehicle_type_code1',
       'vehicle_type_code2', 'borough', 'zip_code',
       'contributing_factor_vehicle_3', 'vehicle_type_code_3',
       'contributing_factor_vehicle_4', 'contributing_factor_vehicle_5',
       'vehicle_type_code_4', 'vehicle_type_code_5', 'contributing_factors',
       'vehicle_types', 'location_id', 'contributingFactor_id',
       'vehicletype_id', 'date_id'],
      dtype='object')

In [54]:
# take a subset of fact_table for only the needed columns: which are keys and measures
fact_table = fact_table[["collision_id",
                         "date_id",
                         "location_id",
                         "contributingFactor_id",
                         "vehicletype_id",
                         "number_of_persons_injured", 
                         "number_of_persons_killed",
                        "number_of_pedestrians_injured", 
                        "number_of_pedestrians_killed"]]

fact_table.head()

Unnamed: 0,collision_id,date_id,location_id,contributingFactor_id,vehicletype_id,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed
0,4381298,20210101,1000,2000,3000,0,0,0,0
1,4383118,20210101,1001,2001,3001,1,0,1,0
2,4382769,20210101,1002,2002,3001,0,0,0,0
3,4381189,20210101,1002,2003,3001,1,0,0,0
4,4380754,20210101,1003,2004,3000,1,0,0,0


### Step 12: Deliver Facts and Dimensions to Data Warehouse (BigQuery)

In [55]:
# create a function to load dataframes to BigQuery

def load_table_to_bigquery(df,
                          table_name,
                          dataset_id):

    dataset_id = dataset_id #change 301800 to match your project id

    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"

    upload_table_name = f"{dataset_id}.{table_name}"
    
    load_job = bigquery_client.load_table_from_dataframe(df,
                                                upload_table_name,
                                                job_config = job_config)
        
    print(f"Starting job {load_job}")

In [56]:
# load your first dimension to BigQuery

load_table_to_bigquery(df = location_dim,
                      table_name = "location_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-final-project, location=US, id=f516a36e-e665-413a-8bb5-ccb43ccce946>


In [57]:
# load your second dimension to BigQuery

load_table_to_bigquery(df = vehicle_type_dim,
                      table_name = "vehicle_type_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-final-project, location=US, id=24674fec-387a-47ff-9a39-aeea06721b85>


In [58]:
# load your third dimension to BigQuery

load_table_to_bigquery(df = date_dim,
                      table_name = "date_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-final-project, location=US, id=a8a1625e-d4d7-4214-bd72-55cf4630b152>


In [59]:
# load your fourth dimension to BigQuery

load_table_to_bigquery(df = contributingFactor_dim,
                      table_name = "contributingFactor_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-final-project, location=US, id=e8e6dd92-bb9a-41f1-9a39-6145e4669843>


In [60]:
# load your fact table to BigQuery

load_table_to_bigquery(df = fact_table,
                      table_name = "Collision fact",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-final-project, location=US, id=822472b7-04d5-4b20-8697-c7584dfa65c5>
