# Building an ETL Pipeline
---

### Step 0: Install the required python packages

In [None]:
pip install --upgrade sodapy --user

In [None]:
pip install --upgrade db-dtypes

In [None]:
pip install --upgrade pyarrow

In [None]:
pip install --upgrade google-cloud-bigquery

#### Now, on the top of your Notebook select "Kernel" -> "Restart and Clear Output"
Then, continue from the next cell

### Step 1: Setup your NYC Open Data variables (ACTION REQUIRED HERE)

In [1]:
# import libraries
import pandas as pd
import numpy as np
from sodapy import Socrata
from google.cloud import bigquery
from google.oauth2 import service_account

In [2]:
# setup the host name for the API endpoint (the https:// part will be added automatically)
# only need to change this if you are not using NYC Open Data
data_url = 'data.cityofnewyork.us'

In [3]:
# setup the data set at the API endpoint (NYC Ferry Ridership data in this case)
# For example: https://data.cityofnewyork.us/resource/t5n6-gx8c.json
# would give us 'erm2-nwe9'
data_set = 't5n6-gx8c'

In [4]:
# Setup your App Token, which you created in Week 6
# You can find your app token by logging into: https://data.cityofnewyork.us/profile/edit/developer_settings
#app_token = 'your app token here'
app_token = ''

In [5]:
# run this cell to setup your Socrata client that connects python to NYC Open Data

# create the client that points to the API endpoint
nyc_open_data_client = Socrata(data_url, app_token, timeout = 200)
print(f"nyc open data client name is: {nyc_open_data_client}")
print(f"nyc open data client data type is: {type(nyc_open_data_client)}")

nyc open data client name is: <sodapy.socrata.Socrata object at 0x0000017564B6E280>
nyc open data client data type is: <class 'sodapy.socrata.Socrata'>


### Step 2: Setup your Google BigQuery variables (ACTION REQUIRED HERE)

If you did not create a key path in class on 3/30/22 (which created a json file on your computer), you must create one to continue:
1. Open BigQuery
2. On the top-left, click on the Navigation Menu
3. In the Navigation Menu, go to "IAM & Admin" -> "Sercive Accounts"
4. On the top of the page, click on "Create Service Account"
5. Account name: cis9440-spring2022
6. Click create and continue
7. Set Role to Owner
8. Click Continue
9. Click Done
10. In the new row for your Service Account, click on the 3 dots in the "Action" column. Select "Manage Keys"
11. Click "Add Key", then "Create New Key". Select the "JSON" radio button and click "Create"
12. In the next cell, set key_path to the exact file path of your new JSON file. For example, it will look like r'C:\Users\Downloads\cis9440-324315-70048a5e1138.json'

In [6]:
# CHANGE THIS TO YOUR FILE PATH
key_path = r'C:\Users\xj438\Desktop\cis9440-dataWarehousing\lecture7\cis9440-361100-642072a7b126.json'

In [7]:
# run this cell without changing anything to setup your credentials
credentials = service_account.Credentials.from_service_account_file(key_path,
                                                                    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
bigquery_client = bigquery.Client(credentials = credentials,
                                 project = credentials.project_id)

print(f"bigquery client name is: {bigquery_client}")
print(f"bigquery client data type is: {type(bigquery_client)}")

bigquery client name is: <google.cloud.bigquery.client.Client object at 0x0000017564B6EB50>
bigquery client data type is: <class 'google.cloud.bigquery.client.Client'>


Now, you need to create your dataset id:
1. Go to bigquery
2. Inside the "Explorer" window, click on the 3 dots to the right of your cis9440 project called "View Actions"
3. Select "Create dataset"
4. Leave the Project ID as it is, name your Dataset ID etl_dataset
5. Expand your cis9440 project with the triangle on its left-hand side so you can see your new etl_dataset dataset
6. On the right of your etl_dataset, click the 3 dots for "View Actions" -> "Open"
7. You should now see the "Dataset info". Copy the entire "Dataset ID" and paste it in the variable below

In [8]:
dataset_id = 'cis9440-361100.etl_dataset'   # PASTE THIS DATASET ID FROM ABOVE STEPS

dataset_id = dataset_id.replace(':', '.')
print(f"your dataset_id is: {dataset_id}")

your dataset_id is: cis9440-361100.etl_dataset


### Step 3: Extract data

1. connect to NYC Open Data with API Key
2. pull specific dataset as a pandas dataframe
3. Look at shape of extracted data

#### sodapy client.get parameters
1. select
2. where
3. order
4. limit
5. group

In [9]:
# Get the total number of records in our the entire data set
total_record_count = nyc_open_data_client.get(data_set, select = "COUNT(*)")
print(f"total records in {data_set}: {total_record_count[0]['COUNT']}")

total records in t5n6-gx8c: 1525502


In [10]:
# Get the total number of records in our target data set
# UPDATE YOUR WHERE FILTER HERE IF NEEDED, below is only an example
target_record_count = nyc_open_data_client.get(data_set,
                                               where = "Date > '2022-05-01'",
                                               select= "COUNT(*)")
print(f"target records in {data_set}: {int(target_record_count[0]['COUNT'])}")

target records in t5n6-gx8c: 61823


In [11]:
# Now, loop through target data set to pull all rows in chunks (we cannot pull all rows at once)
# AGAIN, UPDATE WHERE FILTER INSIDE BELOW FUNCTION

def extract_socrata_data(chunk_size = 2500,
                         data_set = data_set,
                         where = None):
    
    # measure time this function takes
    import time
    start_time = time.time()
    
    # get total number or records
    if where == None:
        total_records = int(nyc_open_data_client.get(data_set,
                                                     select= "COUNT(*)")[0]["COUNT"])
    else:
        total_records = int(nyc_open_data_client.get(data_set,
                                                     where = where,
                                                     select= "COUNT(*)")[0]["COUNT"])
    
    # start at 0, empty list for results
    start = 0                   
    results = []                

    while True:

        if where == None:
            # fetch the set of records starting at 'start'
            results.extend(nyc_open_data_client.get(data_set,
                                                    offset = start,
                                                    limit = chunk_size))
            
        elif where != None:
            results.extend(nyc_open_data_client.get(data_set,
                                                    where = where,
                                                    offset = start,
                                                    limit = chunk_size))
        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached total_records), exit loop
        if (start > total_records):
            break

    # convert the list into a pandas data frame
    data = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {data.shape}")
    return data

In [12]:
# CREATE DATAFRAME data HERE

data = extract_socrata_data(chunk_size = 2500,
                            data_set = data_set,
                            where = "Date > '2022-05-01'")

function took 5.5 seconds
the shape of your dataframe is: (61823, 7)


### Step 4: Data Profiling

1. Distinct values per column
2. Null values per column
3. Summary statistics per numeric column

In [13]:
# what are the columns in our dataframe?
data.columns

Index(['date', 'hour', 'route', 'direction', 'stop', 'boardings', 'typeday'], dtype='object')

In [14]:
# create and run a function to ceate data profiling dataframe

def create_data_profiling_df(data):
    
    # create an empty dataframe to gather information about each column
    data_profiling_df = pd.DataFrame(columns = ["column_name",
                                                "column_type",
                                                "unique_values",
                                                "duplicate_values",
                                                "null_values",
                                                "non_null_values"])

    # loop through each column to add rows to the data_profiling_df dataframe
    for column in data.columns:

        info_dict = {}

        try:
            info_dict["column_name"] = column
            info_dict["column_type"] = data[column].dtypes
            info_dict["unique_values"] = len(data[column].unique())
            info_dict["duplicate_values"] = data[column].count() - len(data[column].dropna().unique())
            info_dict["null_values"] = data[column].isna().sum()
            info_dict["non_null_values"] = data[column].count()

        except:
            print(f"unable to read column: {column}, you may want to drop this column")

        data_profiling_df = data_profiling_df.append(info_dict, ignore_index=True)

    data_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                                  ascending = [False, False],
                                  inplace=True)
    
    return data_profiling_df

In [15]:
# view your data profiling dataframe
# RUN DATA PROFILING FUNCTION HERE
# to create data_profiling_df
data_profiling_df = create_data_profiling_df(data = data)
data_profiling_df

Unnamed: 0,column_name,column_type,unique_values,duplicate_values,null_values,non_null_values
5,boardings,object,379,61443,2,61821
0,date,object,60,61763,0,61823
4,stop,object,25,61798,0,61823
1,hour,object,19,61804,0,61823
2,route,object,7,61816,0,61823
3,direction,object,3,61820,0,61823
6,typeday,object,2,61821,0,61823


In [16]:
data["direction"].unique()

array(['NB', 'SB', '(blank)'], dtype=object)

In [17]:
# ACTION REQUIRED
# If any of the above columns were unable to be read by your function, you may want to drop them
# To drop a column, update the column name in the line below and run this cell
# data.drop(["column name you would like to drop here"], axis = 1, inplace = True)

data.drop(["typeday"], axis = 1, inplace = True)

In [18]:
data

Unnamed: 0,date,hour,route,direction,stop,boardings
0,2022-05-02T00:00:00.000,5,RW,NB,Rockaway,14
1,2022-05-02T00:00:00.000,5,RW,NB,Sunset Park/BAT,4
2,2022-05-02T00:00:00.000,5,SV,NB,East 34th Street,1
3,2022-05-02T00:00:00.000,5,SV,NB,Stuyvesant Cove,0
4,2022-05-02T00:00:00.000,5,SV,NB,Wall St/Pier 11,0
...,...,...,...,...,...,...
61818,2022-06-30T00:00:00.000,22,ER,SB,Wall St/Pier 11,0
61819,2022-06-30T00:00:00.000,22,ER,SB,Dumbo/Fulton Ferry,18
61820,2022-06-30T00:00:00.000,22,RW,NB,Wall St/Pier 11,0
61821,2022-06-30T00:00:00.000,22,SG,NB,Midtown West/W 39th St-Pier 79,0


### Step 5: Data Cleansing

1. drop unneeded columns
2. drop duplicate rows
3. check for outliers

In [19]:
# Run this to look at a list of your columns
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 61823 entries, 0 to 61822
Data columns (total 6 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   date       61823 non-null  object
 1   hour       61823 non-null  object
 2   route      61823 non-null  object
 3   direction  61823 non-null  object
 4   stop       61823 non-null  object
 5   boardings  61821 non-null  object
dtypes: object(6)
memory usage: 2.8+ MB


In [20]:
# ACTION REQUIRED
# edit the drop_columns list below to include all the columns you would like to drop
# then, run this cell to drop columns

drop_columns = ["drop columns here"]

for column in drop_columns:
    try:
        data.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {data.columns}")

unable to drop drop columns here
columns left in dataframe: Index(['date', 'hour', 'route', 'direction', 'stop', 'boardings'], dtype='object')


In [21]:
# find number of duplicate rows

print(f"number of duplicate rows: {len(data[data.duplicated()])}")

number of duplicate rows: 0


In [22]:
# drop duplicate rows based on entire row
data = data.drop_duplicates(keep = 'first')

# Or, based on a subset of rows, uncomment below and adjust accordingly
## data = data.drop_duplicates(subset = ["subset column"], keep = 'first')
## data = data.drop_duplicates(subset = ["subset column 1", "subset column 2"], keep = 'first')

print(f"number of rows after duplicates dropped: {len(data)}")

number of rows after duplicates dropped: 61823


In [23]:
# drop rows with null values, examples below

# .dropna(thresh = 2)
# .dropna(subset = ["route", "direction"], inplace=True)


# drop rows with at least 2 null values
data.dropna(thresh=2, inplace=True)
print(len(data))

61823


In [24]:
# example data cleaning, we want numerica values not strings here:
data["boardings"] = data["boardings"].astype(float)

In [25]:
# example if ferry has a capacity of 344 people
ferry_capacity = 344

data = data[data["boardings"] <= ferry_capacity]

### Step 4: Create Location Dimension

In [26]:
# first, copy the entire table
location_dim = data.copy()

In [27]:
location_dim.columns

Index(['date', 'hour', 'route', 'direction', 'stop', 'boardings'], dtype='object')

In [28]:
# second, subset for only the wanted columns in the dimension
location_dim = location_dim[["route",
                             "stop",
                             "direction"]]

In [29]:
# third, drop duplicate rows in dimension
# unique_row = ["unique rows here"]
unique_row = ["route", "stop","direction"]
location_dim = location_dim.drop_duplicates(subset = unique_row, keep = 'first')
location_dim = location_dim.reset_index(drop = True)
location_dim

Unnamed: 0,route,stop,direction
0,RW,Rockaway,NB
1,RW,Sunset Park/BAT,NB
2,SV,East 34th Street,NB
3,SV,Stuyvesant Cove,NB
4,SV,Wall St/Pier 11,NB
...,...,...,...
72,GI,Gov. Island/Yankee Pier,SB
73,GI,Wall St/Pier 11,SB
74,ER,Dumbo/Fulton Ferry,(blank)
75,AS,Astoria,(blank)


In [30]:
# fourth, add location_id as a surrogate key
location_dim.insert(0, 'location_id', range(1, 1 + len(location_dim)))
location_dim

Unnamed: 0,location_id,route,stop,direction
0,1,RW,Rockaway,NB
1,2,RW,Sunset Park/BAT,NB
2,3,SV,East 34th Street,NB
3,4,SV,Stuyvesant Cove,NB
4,5,SV,Wall St/Pier 11,NB
...,...,...,...,...
72,73,GI,Gov. Island/Yankee Pier,SB
73,74,GI,Wall St/Pier 11,SB
74,75,ER,Dumbo/Fulton Ferry,(blank)
75,76,AS,Astoria,(blank)


In [31]:
# fifth, add the location_id to the data table
data = data.merge(location_dim,
                  left_on = unique_row,
                  right_on = unique_row,
                  how = 'left')

data.head(100)

Unnamed: 0,date,hour,route,direction,stop,boardings,location_id
0,2022-05-02T00:00:00.000,5,RW,NB,Rockaway,14.0,1
1,2022-05-02T00:00:00.000,5,RW,NB,Sunset Park/BAT,4.0,2
2,2022-05-02T00:00:00.000,5,SV,NB,East 34th Street,1.0,3
3,2022-05-02T00:00:00.000,5,SV,NB,Stuyvesant Cove,0.0,4
4,2022-05-02T00:00:00.000,5,SV,NB,Wall St/Pier 11,0.0,5
...,...,...,...,...,...,...,...
95,2022-05-02T00:00:00.000,7,SB,SB,Atlantic Ave/BBP Pier 6,8.0,57
96,2022-05-02T00:00:00.000,7,SB,SB,Bay Ridge,0.0,58
97,2022-05-02T00:00:00.000,7,SB,SB,Corlears Hook,2.0,38
98,2022-05-02T00:00:00.000,7,SB,SB,Gov. Island/Yankee Pier,0.0,59


### Step 5: Create Date Dimension

In [32]:
data["date"] = pd.to_datetime(data['date'])
data["date"]= data["date"].dt.floor('D')

In [33]:
data["date"]

0       2022-05-02
1       2022-05-02
2       2022-05-02
3       2022-05-02
4       2022-05-02
           ...    
61752   2022-06-30
61753   2022-06-30
61754   2022-06-30
61755   2022-06-30
61756   2022-06-30
Name: date, Length: 61757, dtype: datetime64[ns]

In [34]:
## ACTION REQUIRED: update the start and end date at the bottom of the sql_query variable to fit your needs

sql_query = """
            SELECT
              CONCAT (FORMAT_DATE("%Y",d),FORMAT_DATE("%m",d),FORMAT_DATE("%d",d)) as date_id,
              d AS full_date,
              FORMAT_DATE('%w', d) AS week_day,
              FORMAT_DATE('%A', d) AS day_name,
              FORMAT_DATE('%B', d) as month_name,
              FORMAT_DATE('%Q', d) as fiscal_qtr,
              FORMAT_DATE('%Y', d) AS year,
            FROM (
              SELECT
                *
              FROM
                UNNEST(GENERATE_DATE_ARRAY('2020-01-01', '2024-01-01', INTERVAL 1 DAY)) AS d )
            """

# store extracted data in new dataframe
date_dim = bigquery_client.query(sql_query).to_dataframe()

# validate that > 0 rows have been extracted and return dataframe
if len(date_dim) > 0:
    print(f"date dimension created successfully, shape of dimension: {date_dim.shape}")
else:
    print("date dimension FAILED")

date dimension created successfully, shape of dimension: (1462, 7)


In [35]:
# create date_id column in the Fact Table
data['date_id'] = data['date'].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))

In [36]:
data.drop("date", axis = 1, inplace = True)

In [37]:
data

Unnamed: 0,hour,route,direction,stop,boardings,location_id,date_id
0,5,RW,NB,Rockaway,14.0,1,20220502
1,5,RW,NB,Sunset Park/BAT,4.0,2,20220502
2,5,SV,NB,East 34th Street,1.0,3,20220502
3,5,SV,NB,Stuyvesant Cove,0.0,4,20220502
4,5,SV,NB,Wall St/Pier 11,0.0,5,20220502
...,...,...,...,...,...,...,...
61752,22,ER,SB,Wall St/Pier 11,0.0,50,20220630
61753,22,ER,SB,Dumbo/Fulton Ferry,18.0,51,20220630
61754,22,RW,NB,Wall St/Pier 11,0.0,31,20220630
61755,22,SG,NB,Midtown West/W 39th St-Pier 79,0.0,63,20220630


### Create time_dim

In [38]:
time_ids = []
hours = []
minutes = []

for hour in range(0,24):
    for minute in range(0,60):
        time_ids.append((str(hour) + str(minute)).zfill(4))
        hours.append(hour)
        minutes.append(minute)
        
time_dim_dict = {"time_id" : time_ids,
                "hour" : hours,
                "minute" : minutes}

time_dim = pd.DataFrame(data = time_dim_dict)

In [39]:
time_dim

Unnamed: 0,time_id,hour,minute
0,0000,0,0
1,0001,0,1
2,0002,0,2
3,0003,0,3
4,0004,0,4
...,...,...,...
1435,2355,23,55
1436,2356,23,56
1437,2357,23,57
1438,2358,23,58


In [40]:
# USE ZFILL AND PAD here to make time_id
data["hour"] = data["hour"].str.zfill(2).str.pad(4, side='right', fillchar='0')

In [41]:
data.rename(columns = {'hour':'time_id'}, inplace = True)
data["time_id"]

0        0500
1        0500
2        0500
3        0500
4        0500
         ... 
61752    2200
61753    2200
61754    2200
61755    2200
61756    2200
Name: time_id, Length: 61757, dtype: object

### Step 5: Creating Fact(s)

In [42]:
data

Unnamed: 0,time_id,route,direction,stop,boardings,location_id,date_id
0,0500,RW,NB,Rockaway,14.0,1,20220502
1,0500,RW,NB,Sunset Park/BAT,4.0,2,20220502
2,0500,SV,NB,East 34th Street,1.0,3,20220502
3,0500,SV,NB,Stuyvesant Cove,0.0,4,20220502
4,0500,SV,NB,Wall St/Pier 11,0.0,5,20220502
...,...,...,...,...,...,...,...
61752,2200,ER,SB,Wall St/Pier 11,0.0,50,20220630
61753,2200,ER,SB,Dumbo/Fulton Ferry,18.0,51,20220630
61754,2200,RW,NB,Wall St/Pier 11,0.0,31,20220630
61755,2200,SG,NB,Midtown West/W 39th St-Pier 79,0.0,63,20220630


In [43]:
# take a subset of fact_table for only the needed columns: which are keys and measures
# SUBSET your fact_table

fact_table = data[["time_id", "date_id", "location_id", "boardings"]]
fact_table

Unnamed: 0,time_id,date_id,location_id,boardings
0,0500,20220502,1,14.0
1,0500,20220502,2,4.0
2,0500,20220502,3,1.0
3,0500,20220502,4,0.0
4,0500,20220502,5,0.0
...,...,...,...,...
61752,2200,20220630,50,0.0
61753,2200,20220630,51,18.0
61754,2200,20220630,31,0.0
61755,2200,20220630,63,0.0


### Step 6: Deliver Facts and Dimensions to Data Warehouse (BigQuery)

In [44]:
# create a function to load dataframes to BigQuery

def load_table_to_bigquery(df,
                           table_name,
                           dataset_id):

    dataset_id = dataset_id #change 301800 to match your project id

    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"

    upload_table_name = f"{dataset_id}.{table_name}"
    
    load_job = bigquery_client.load_table_from_dataframe(df,
                                                upload_table_name,
                                                job_config = job_config)
        
    print(f"completed job {load_job}")

In [45]:
# Load each of your tables to bigquery

'''
load_table_to_bigquery(df=fact_table, 
                       table_name="ferry_ridership_fact", 
                       dataset_id=dataset_id)
                       
'''

'\nload_table_to_bigquery(df=fact_table, \n                       table_name="ferry_ridership_fact", \n                       dataset_id=dataset_id)\n                       \n'

In [46]:
'''
load_table_to_bigquery(df=location_dim, 
                       table_name="location_dim", 
                       dataset_id=dataset_id)
'''

completed job LoadJob<project=cis9440-361100, location=US, id=89ab19e3-87a7-4f19-a350-3942d535bad1>


In [47]:
'''
load_table_to_bigquery(df=date_dim, 
                       table_name="date_dim", 
                       dataset_id=dataset_id)
'''

completed job LoadJob<project=cis9440-361100, location=US, id=afb516dc-882e-4bf3-8d25-8f3cd5208dc0>


In [48]:
'''
load_table_to_bigquery(df=time_dim, 
                       table_name="time_dim", 
                       dataset_id=dataset_id)
'''

completed job LoadJob<project=cis9440-361100, location=US, id=a3bd0e93-6827-4a59-8165-8fbb06937dbb>
