# ETL

a notebook for the ETL process. The code can be found also in *etl.py*. All the work is based on PostgreSQL and Python.

``` downloading the data from csv/json -> load into stage -> transform -> load into final db ```

 
The *idea* is to build ETL for comapny who looking to find the best place to open business, based on travel and census data of NYC.

For running the code you will first need to create the tabels and the schemes using *create.py*. For more info about the datasets please refere to the dict.md and write_up.md, the former include all datasets and the fields, while the later include info about the schema, the tools and  advanced scenario. Except those two files you can see more info on the main readme file.  


### Road map:

* Staging - **Done**
    1. Staging census data
    2. Staging taxi data
    3. Staging grid data
    4. Create a new PK for each table
    5. data quality part 1 - tables records are not null
* Transform - **Done**
    1. Add geom field for all tables
    2. Add 2 geom field for trips - for pickup and dropoff
    3. data quality part 2 - check and fix invalid geometries
    4. Spatial Joins:
        * census with grid
        * census with taxi zones
        * trips pickup with grid
        * trips pickup with zones
        * trips dropoff with grid
        * trips dropoff with grid
    5. Create time table from trips table - need to be done for pickup and dropoff
* Load - **Done**
    1. Load census data into fact table 1
    2. Load taxi data into fact table 2
    3. Load time dim table
    4. Load grid dim table
    5. load taxi service zones dim table
    6. data quality part 3 - tables records are not empty
    


Notes:

* if using *shp2pgsql* one don't need to create table first 

In [2]:
%load_ext sql

In [13]:
import configparser
import numpy
import os
import pandas as pd
import psycopg2
import psycopg2.extras as extras
from sqlalchemy import create_engine
import sys

## Staging The Data

###  Support Functions

In [25]:
def read_file(fpath, ftype, col="pk_id"):
    """
    Description: read a csv/json file to df and add id field
    Arguments:
        fpath: a file path
        ftype: file type - json/csv
        col: a name for the new id 
    Returns:
        df: a panda data frame
    """
        
    if ftype == "json":
        df = pd.read_json(fpath)
        df[col] = range(1, 1+len(df))
        return df
    elif ftype == "csv":
        df = pd.read_csv(fpath)
        df[col] = range(1, 1+len(df))
        return df

In [5]:
def check_null(df):
    """
    Description: check for null in pandas df. print results
    Arguments:
        df: a pandas dataframe
    Returns:
        None
    """
    totalCount = df.shape[0]
    print(f"Total rows count is {totalCount}")
    listOfColumnNames = list(df)
    for col in listOfColumnNames:
        isNull = df[col].isnull().values.any()
        if isNull == True:
            nullCount = df[col].isnull().sum()
            nullCountPrcnt = df[col].isnull().sum() / totalCount * 100
            print(col, nullCount, nullCountPrcnt)

In [6]:
def get_param(config_file):
    """
    Description: extract postgres connection parmaters from config file
    Arguments:
        config_file: path to file
    Returns:
        a dict of paramters
    """
    
    config = configparser.ConfigParser()
    config.read(config_file)
    
    host = config.get('postgres', 'host')
    database = config.get('postgres', 'database')
    username = config.get('postgres', 'user')
    password = config.get('postgres', 'password')
    port = config.get('postgres', 'port')
    
    param_dic = {
    "host"      : host,
    "database"  : database,
    "user"      : username,
    "password"  : password,
    "port"      : port
    }
    
    return param_dic 

param_dic = get_param("../finalData//config.cfg") 
print(param_dic)

{'host': 'localhost', 'database': 'gis', 'user': 'ziv', 'password': 'password', 'port': '5433'}


In [23]:
def get_files_path(config_file):
    """
    Description: extract csv/json files path from config file
    Arguments:
        config_file: path to file
    Returns:
        a dict of paths
    """
    config = configparser.ConfigParser()
    config.read("../finalData/config.cfg")
    trips_path = config.get('data', 'trips')
    trips_path
    
    files_loc = {
        "trips"            : config.get('data', 'trips'),
        "nyc_grid"         : config.get('data', 'nyc_grid'),
        "census_tracts"    : config.get('data', 'census_tracts'),
        "census_block_loc" : config.get('data', 'census_block_loc'),
        "taxi_zones"       : config.get('data', 'taxi_zones')
    }
    
    return files_loc


loc = get_files_path("../finalData/config.cfg")

In [22]:
def connect(params_dic):
    """
    Description: Connect to the PostgreSQL database server
    Arguments:
        params_dic: a list of the parameters to connect to postgres
    Returns:
        conn: a connection to postgres db
    """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params_dic)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1) 
    print("Connection successful")
    return conn

In [12]:
conn = connect(param_dic)

Connecting to the PostgreSQL database...
Connection successful


In [8]:
def execute_values(conn, df, table, truncate=True):
    """
    Description: Using psycopg2.extras.execute_values() to insert the dataframe into postgres
    Arguments:
        conn: a connection to postgres db
        df: pandas df to load 
        table: postgres table to insert the data into 
    Returns:
        None
    """
    
    tuples = [tuple(x) for x in df.to_numpy()]
    cols = ','.join(list(df.columns))
    query  = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        if truncate:
            trucate_query  = f"TRUNCATE TABLE {table};"
            cursor.execute(trucate_query)
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_values() done")
    cursor.close()

In [9]:
def merge_census(census, blocks, county="County", census_key="CensusTract"):
    """
    Description: an helper function to merge census data with coordinates
    Arguments:
        census: a pandas census dataframe
        blocks: a pandas census point location dataframe
        county: a field in blocks to filter the nyc boros boros
        census_key: census primary key 
    Returns:
        pandas data frame with location data
    """
    boros = ['Bronx','Kings','New York','Queens','Richmond']
    blocks_df = blocks[blocks[county].isin(boros)]
    blocks_df['tract'] = blocks_df.BlockCode // 10000
    census_with_xy = blocks_df.merge(census, left_on='tract', right_on=census_key)
    return census_with_xy

In [17]:
def check_empty(conn, table):
    """
    Description: data quality function, check if postgres table is empty
    Arguments:
        conn: psycopg2 connection to postgres
        table: table name to load into
    Returns:
        string - with table is empty or not
    """
    cursor = conn.cursor()
    sql = f"SELECT COUNT(*) FROM {table}"
    print (sql)
    query = cursor.execute(sql)
    res = cursor.fetchall()[0][0]
    if res > 0:
        return f"table {table} is not empty, count: {res}"
    else:
        return f"table {table} is empty"

### Reading and writing files

In [26]:
trips_path = loc.get("trips")
census_block_loc_path = loc.get("census_block_loc")
census_tracts_path = loc.get("census_tracts")
nyc_grid_path = loc.get("nyc_grid")
taxi_zones_path = loc.get("taxi_zones")

trips = read_file(trips_path, "csv", "trip_id")
census_block_loc = read_file(census_block_loc_path, "csv", "census_block_id")
census_tracts = read_file(census_tracts_path, "csv", "census_id")
nyc_grid = read_file(nyc_grid_path, "json", "grid_id")
taxi_zones = read_file(taxi_zones_path, "csv", "taxi_zone_id")

In [12]:
# load what we can allready
execute_values(conn, trips, "staging.trips")

execute_values() done


In [13]:
execute_values(conn, taxi_zones, "staging.taxi_zones")

execute_values() done


In [14]:
execute_values(conn, nyc_grid, "staging.grid_250m")

execute_values() done


In [15]:
# preapre census to loading
census_data = merge_census(census_tracts, census_block_loc)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  blocks_df['tract'] = blocks_df.BlockCode // 10000


In [16]:
census_data.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 18052 entries, 0 to 18051
Data columns (total 44 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Latitude         18052 non-null  float64
 1   Longitude        18052 non-null  float64
 2   BlockCode        18052 non-null  int64  
 3   County_x         18052 non-null  object 
 4   State            18052 non-null  object 
 5   census_block_id  18052 non-null  int32  
 6   tract            18052 non-null  int64  
 7   CensusTract      18052 non-null  int64  
 8   County_y         18052 non-null  object 
 9   Borough          18052 non-null  object 
 10  TotalPop         18052 non-null  int64  
 11  Men              18052 non-null  int64  
 12  Women            18052 non-null  int64  
 13  Hispanic         12999 non-null  float64
 14  White            12999 non-null  float64
 15  Black            12999 non-null  float64
 16  Native           12999 non-null  float64
 17  Asian       

In [17]:
execute_values(conn, census_data, "staging.census")

execute_values() done


### Check loading

In [18]:
check_empty(conn, "staging.taxi_zones")

SELECT COUNT(*) FROM staging.taxi_zones


'table staging.taxi_zones is not empty, count: 263'

In [19]:
check_empty(conn, "staging.grid_250m")

SELECT COUNT(*) FROM staging.grid_250m


'table staging.grid_250m is not empty, count: 14058'

In [20]:
check_empty(conn, "staging.trips")

SELECT COUNT(*) FROM staging.trips


'table staging.trips is not empty, count: 1458644'

In [21]:
check_empty(conn, "staging.census")

SELECT COUNT(*) FROM staging.census


'table staging.census is not empty, count: 18052'

## Transform The Data

In this part we have few tasks:

1. Add geom field for all tables - **Done**
2. Add 2 geom field for trips - for pickup and dropoff - **Done**
3. Spatial Join: - **Done**
    * census with grid
    * census with taxi zones
    * trips pickup with grid
    * trips pickup with zones
    * trips dropoff with grid
    * trips dropoff with grid
4. Create time table from trips table - need to be done for pickup and dropoff - **Done**

In [22]:
def create_time_table(conn):
    """
    Description: Create time table from trips table 
    Arguments:
        conn: a connection to postgres db
    Returns:
        None
    """
    
    query = """
    BEGIN;
    DROP TABLE IF EXISTS staging.trips_datetime;
    
    select 
        trip_id,
        EXTRACT(YEAR from pickup_datetime) as pickup_year,
        EXTRACT(MONTH from pickup_datetime) as pickup_month,
        EXTRACT(DAY from pickup_datetime) as pickup_day,
        EXTRACT(HOUR from pickup_datetime) as pickup_hour,
        EXTRACT(MINUTE from pickup_datetime) as pickup_minute,
        EXTRACT(YEAR from dropoff_datetime) as dropoff_year,
        EXTRACT(MONTH from dropoff_datetime) as dropoff_month,
        EXTRACT(DAY from dropoff_datetime) as dropoff_day,
        EXTRACT(HOUR from dropoff_datetime) as dropoff_hour,
        EXTRACT(MINUTE from dropoff_datetime) as dropoff_minute
    into 
        staging.trips_datetime
    from 
        staging.trips;
        
    COMMIT;
    """
    cursor = conn.cursor()
    
    print("Create time table for trips table")
    cursor.execute(query)
    
    print("Commit...")
    conn.commit()
    print("Create table completed...")

In [23]:
create_time_table(conn)

Create time table for trips table
Commit...
Create table completed...


In [24]:
check_empty(conn, "staging.trips_datetime")

SELECT COUNT(*) FROM staging.trips_datetime


'table staging.trips_datetime is not empty, count: 1458644'

In [25]:
def geom_from_wtl(conn, table, wkt_col, geom_col, epsg):
    """
    Description: Create geom column from wkt in postgres table 
    Arguments:
        conn: a connection to postgres db
        table: table to create new geom column 
        wkt_col: name of column who have wkt info
        geom_col: name for new columm where geom will be created
        epsg: system refernce code
    Returns:
        None
    """
    
    query = f"""
    ALTER TABLE {table} 
    DROP COLUMN IF EXISTS {geom_col};
    
    ALTER TABLE {table} ADD COLUMN {geom_col} GEOMETRY;

    UPDATE {table}
    SET geom = GeomFromText(({wkt_col}), {epsg});
    """
    
    print(query)
    cursor = conn.cursor()
    
    print("Creating geom field")
    cursor.execute(query)
    
    print("Commit...")
    conn.commit()
    print("completed...")
    

def geom_from_lat_lon(conn, table, lon_col, lat_col, geom_col, epsg):
    """
    Description: Create geom column from lat and lon columns in postgres table 
    Arguments:
        conn: a connection to postgres db
        table: table to create new geom column 
        lon_col: name of column who have lat coordiantes
        lat_col: name of column who have lon coordiantes
        epsg: system refernce code
    Returns:
        None
    """
    
    query = f"""
    ALTER TABLE {table} 
    DROP COLUMN IF EXISTS {geom_col};
    
    ALTER TABLE {table} ADD COLUMN {geom_col} GEOMETRY;

    UPDATE {table}
    SET {geom_col} = GeomFromText(CONCAT('POINT(' ,{lon_col},' ', {lat_col}, ')'), {epsg});
    """
    
    print(query)
    cursor = conn.cursor()
    
    print("Creating geom field")
    cursor.execute(query)
    
    print("Commit...")
    conn.commit()
    print("completed...")

In [26]:
geom_from_wtl(conn,'staging.taxi_zones', 'wkt', 'geom', '4326')


    ALTER TABLE staging.taxi_zones 
    DROP COLUMN IF EXISTS geom;
    
    ALTER TABLE staging.taxi_zones ADD COLUMN geom GEOMETRY;

    UPDATE staging.taxi_zones
    SET geom = GeomFromText((wkt), 4326);
    
Creating geom field
Commit...
completed...


In [27]:
geom_from_wtl(conn,'staging.grid_250m', 'wkt', 'geom', '4326')


    ALTER TABLE staging.grid_250m 
    DROP COLUMN IF EXISTS geom;
    
    ALTER TABLE staging.grid_250m ADD COLUMN geom GEOMETRY;

    UPDATE staging.grid_250m
    SET geom = GeomFromText((wkt), 4326);
    
Creating geom field
Commit...
completed...


In [28]:
geom_from_lat_lon(conn, 'staging.census', 'longitude', 'latitude', 'geom', '4326')


    ALTER TABLE staging.census 
    DROP COLUMN IF EXISTS geom;
    
    ALTER TABLE staging.census ADD COLUMN geom GEOMETRY;

    UPDATE staging.census
    SET geom = GeomFromText(CONCAT('POINT(' ,longitude,' ', latitude, ')'), 4326);
    
Creating geom field
Commit...
completed...


In [29]:
geom_from_lat_lon(conn, 'staging.trips', 'pickup_longitude', 'pickup_latitude', 'pickup_geom', '4326')


    ALTER TABLE staging.trips 
    DROP COLUMN IF EXISTS pickup_geom;
    
    ALTER TABLE staging.trips ADD COLUMN pickup_geom GEOMETRY;

    UPDATE staging.trips
    SET pickup_geom = GeomFromText(CONCAT('POINT(' ,pickup_longitude,' ', pickup_latitude, ')'), 4326);
    
Creating geom field
Commit...
completed...


In [30]:
geom_from_lat_lon(conn, 'staging.trips', 'dropoff_longitude', 'dropoff_latitude', 'dropoff_geom', '4326')


    ALTER TABLE staging.trips 
    DROP COLUMN IF EXISTS dropoff_geom;
    
    ALTER TABLE staging.trips ADD COLUMN dropoff_geom GEOMETRY;

    UPDATE staging.trips
    SET dropoff_geom = GeomFromText(CONCAT('POINT(' ,dropoff_longitude,' ', dropoff_latitude, ')'), 4326);
    
Creating geom field
Commit...
completed...


In [79]:
def spatial_join(conn, schema, table, geom_point, new_table_name, fields, grid_pk_alias, zone_pk_alias):
    """
    Description: Join and create new table using spatial join
    Arguments:
        conn: a connection to postgres db
        schema: the schema to save and read the new table
        table: point data table
        geom_point: geom field for point data table
        new_table_name: the name for the new table
        fields: the fields the user want to save from the point data table
        grid_pk_alias: the name for grid pk in the created table
        zone_pk_alias: the name for taxi service zone pk in the created table
    Returns:
        None
    """
    
    fields_str = " "
    tmp_lst = [f"{schema}.{table}.{x}," for x in fields]
    tmp_lst.append(f"{schema}.grid_250m.grid_id AS {grid_pk_alias}")
    fields_str = (fields_str.join(tmp_lst))
    
    query=f"""    
    DROP TABLE IF EXISTS staging.{new_table_name};
    
    create table {schema}.{new_table_name} as with cte_sj as (
    select
        {fields_str}
    from
        {schema}.{table}
    join {schema}.grid_250m on
        ST_Contains({schema}.grid_250m.geom, {schema}.{table}.{geom_point}) )

    select
        cte_sj.*, {schema}.taxi_zones.taxi_zone_id AS {zone_pk_alias}
    from 
        cte_sj
    join {schema}.taxi_zones on
        ST_Contains({schema}.taxi_zones.geom, cte_sj.{geom_point});
    """
    
    cursor = conn.cursor()
    
    print("Spatial Join tables")
    print("Creating new table may take a while")
    cursor.execute(query)
    
    print("Commit...")
    conn.commit()
    print("Create table completed...")

In [80]:
census_fields = ["totalpop", "men", "women", "hispanic", "white", "black", "native", "asian", "citizen", "income", "poverty",
"childpoverty", "professional", "service", "office", "construction", "production", "drive", "carpool", "transit", "walk",
"othertransp", "workathome", "meancommute", "employed", "privatework", "publicwork", "selfemployed", "familywork",
"unemployment", "census_id", "geom"]
trips_fields = ["passenger_count", "trip_duration", "trip_id", "pickup_geom", "dropoff_geom"]

In [81]:
spatial_join(conn, "staging", "census", "geom", "census_sj", census_fields, "grid_id", "taxi_zone_id")

Spatial Join tables
Creating new table may take a while
Commit...
Create table completed...


In [82]:
spatial_join(conn, "staging", "trips", "pickup_geom", "pickup_sj", trips_fields, "pickup_grid_id", "pickup_taxi_zone_id")

Spatial Join tables
Creating new table may take a while
Commit...
Create table completed...


In [86]:
pickup_fields = ["*"]
spatial_join(conn, "staging", "pickup_sj", "dropoff_geom", "trips_sj", pickup_fields, "dropoff_grid_id", "dropoff_taxi_zone_id")

Spatial Join tables
Creating new table may take a while
Commit...
Create table completed...


In [84]:
check_empty(conn, "staging.trips_sj")

SELECT COUNT(*) FROM staging.trips_sj


'table staging.trips_sj is not empty, count: 1454643'

In [85]:
check_empty(conn, "staging.census_sj")

SELECT COUNT(*) FROM staging.census_sj


'table staging.census_sj is not empty, count: 11703'

In [87]:
# QA to run before spatial join
def fix_geom(conn, table, geom_field):
    """
    Description: data quality function, check for invalid geom and try to fix them
    Arguments:
        conn: psycopg2 connection to postgres
        table: table name to check
        geom_field: geom field name
    Returns:
        None
    """
    
    count_query=f"""
    select count(*) from {table} where st_isvalid({geom_field})=False;
    """
    fix_query=f"""
    update {table} 
    set {geom_field} = ST_MakeValid({geom_field}) 
    where st_isvalid({geom_field})=False;
    """

    cursor = conn.cursor()

    query = cursor.execute(count_query)
    res = cursor.fetchall()[0][0]
    if res > 0:
        print(f"Fixing table {table}")
        cursor.execute(count_query)
    else:
        print(f"{table} does not have invalid geom")
    
    
    print("Commit...")
    conn.commit()
    print("completed...")

In [88]:
fix_geom(conn, "staging.grid_250m", "geom")

staging.grid_250m does not have invalid geom
Commit...
completed...


In [89]:
fix_geom(conn, "staging.taxi_zones", "geom")

staging.taxi_zones does not have invalid geom
Commit...
completed...


In [90]:
fix_geom(conn, "staging.trips", "dropoff_geom")

staging.trips does not have invalid geom
Commit...
completed...


In [91]:
fix_geom(conn, "staging.trips", "pickup_geom")

staging.trips does not have invalid geom
Commit...
completed...


In [92]:
fix_geom(conn, "staging.census", "geom")

staging.census does not have invalid geom
Commit...
completed...


## Loading the data

In [11]:
def insert_into_table(conn, old_table, new_table, truncate=True):
    """
    Description: Create geom column from lat and lon columns in postgres table 
    Arguments:
        conn: a connection to postgres db
        old_table: from where to select the data
        new_table: where to insert the data
        truncate: clear the table if true
    Returns:
        None
    """

    query = f"""
    INSERT INTO {new_table}
    SELECT * FROM {old_table}
    """
    
    if truncate:
        trucate_query  = f"TRUNCATE TABLE {new_table};"
        cursor.execute(trucate_query)
    
    print(query)
    cursor = conn.cursor()
    
    print("Insert data to new table")
    cursor.execute(query)
    
    conn.commit()
    print("completed...")

In [12]:
insert_into_table(conn, "staging.census_sj", "data.nyc_census")


    INSERT INTO data.nyc_census
    SELECT * FROM staging.census_sj
    
Insert data to new table
completed...


In [13]:
insert_into_table(conn, "staging.grid_250m", "data.nyc_grid_250m")


    INSERT INTO data.nyc_grid_250m
    SELECT * FROM staging.grid_250m
    
Insert data to new table
completed...


In [14]:
insert_into_table(conn, "staging.trips_sj", "data.nyc_taxi_trips")


    INSERT INTO data.nyc_taxi_trips
    SELECT * FROM staging.trips_sj
    
Insert data to new table
completed...


In [15]:
insert_into_table(conn, "staging.taxi_zones", "data.nyc_taxi_zones")


    INSERT INTO data.nyc_taxi_zones
    SELECT * FROM staging.taxi_zones
    
Insert data to new table
completed...


In [16]:
insert_into_table(conn, "staging.trips_datetime", "data.nyc_trips_datetime")


    INSERT INTO data.nyc_trips_datetime
    SELECT * FROM staging.trips_datetime
    
Insert data to new table
completed...


In [18]:
check_empty(conn, "data.nyc_trips_datetime")

SELECT COUNT(*) FROM data.nyc_trips_datetime


'table data.nyc_trips_datetime is not empty, count: 1458644'

In [19]:
check_empty(conn, "data.nyc_taxi_zones")

SELECT COUNT(*) FROM data.nyc_taxi_zones


'table data.nyc_taxi_zones is not empty, count: 263'

In [20]:
check_empty(conn, "data.nyc_taxi_trips")

SELECT COUNT(*) FROM data.nyc_taxi_trips


'table data.nyc_taxi_trips is not empty, count: 1454643'

In [21]:
check_empty(conn, "data.nyc_grid_250m")

SELECT COUNT(*) FROM data.nyc_grid_250m


'table data.nyc_grid_250m is not empty, count: 14058'

In [22]:
check_empty(conn, "data.nyc_census")

SELECT COUNT(*) FROM data.nyc_census


'table data.nyc_census is not empty, count: 11703'