# 1. ETL


In this notebook, the NY_taxi_data saved under the 'data' folder will be extracted and transfromed in an appropriate form to be loaded in a database. A Postgis database named 'cargo' was created inside a docker container in prior to this step (details on the setup for the docker environments is described in the README.md of this repository)

The workflow I follow in this notebook is:

1. Connect to the database (postgis)
2. Create tables for each month's data ('taxi_jan', 'taxi_apr', 'taxi_jul')
3. Convert the original text data and insert the values to the corresponding tables

In [1]:
# Import libraries
import geopandas as gpd
import pandas as pd
from geoalchemy2 import Geometry, WKTElement
from sqlalchemy import create_engine
import psycopg2
import glob



### Data preparation before connecting to the database

* Prepare a list of all the NY_taxi_data file names available.
* Separate the files depending on the corresponding month (Jan, Apr, Jul).
    -> to distribute the data into different tables (due to limited memory space)

In [2]:
# Read the names of NY taxi data and save them in a list
filenames = sorted(glob.glob('../data/NY_taxi_data/*')) # in ascending order
print(len(filenames))
filenames[:3]

77


['../data/NY_taxi_data/yellow_tripdata_2015-01_00',
 '../data/NY_taxi_data/yellow_tripdata_2015-01_01',
 '../data/NY_taxi_data/yellow_tripdata_2015-01_02']

In [3]:
# Separate Jan, Apr, Jul data
Jan = []
Apr = []
Jul = []
for i in range(len(filenames)):
    if filenames[i][-5:-3] == '01':
        Jan.append(filenames[i])
    elif filenames[i][-5:-3] == '04':
        Apr.append(filenames[i])
    else:
        Jul.append(filenames[i])

print(len(Jan)) # list of data files from 2015-01
print(len(Apr)) # 2015-04
print(len(Jul)) # 2015-07

26
27
24


### Connect to the database

In [42]:
# Define a function to try connection to the database 'carto'
# In case the connecting attempt fails, print the error message
def connect_to_db():
    try:
        #conn for connection
        conn =  psycopg2.connect(dbname='carto', user='carto', password='carto', host='postgis', port='5432')
    except psycopg2.DatabaseError:
        print ("I am unable to connect the database")
    return conn

In [4]:
# Prepare a table names to be created in the database
tablenames = ['taxi_jan', 'taxi_apr', 'taxi_jul']

In [7]:
# Function to create a table called 'taxi_x' inside the postgis database
def create_table(tablename):
    # connect to the db
    conn = connect_to_db()

    # Prepare a query to create table for NY taxi data
    q_create_table = f"""
                    create table {tablename}
                    (
                        vendorID int,
                        tpep_pickup_datetime timestamp,
                        tpep_dropoff_datetime timestamp,
                        passenger_count int,
                        trip_distance numeric,
                        pickup_longitude numeric,
                        pickup_latitude numeric,
                        RateCodeID int,
                        store_and_fwd_flag char(1),
                        dropoff_longitude numeric,
                        dropoff_latitude numeric,
                        payment_type int,
                        fare_amount numeric,
                        extra numeric,
                        mta_tax numeric,
                        tip_amount numeric,
                        tolls_amount numeric,
                        improvement_surcharge numeric,
                        total_amount numeric
                    )
                    """

    try:
        cur = conn.cursor()  # initiate cursor (communication with db)
        cur.execute(q_create_table)  # execute the query
        conn.commit()
        print(f'{tablename} created')

    except psycopg2.DatabaseError: # print error if fails
        print ("Failed to create the table")

    # Close the communication & connection with the postgis
    finally:
        cur.close()
        conn.close ()

In [7]:
# Create tables for Jan, Apr, Jul
for tablename in tablenames:
    create_table(tablename)

taxi_jan created
taxi_apr created
taxi_jul created


### Read the files, transform the data and load it to the db

In [8]:
# Function to fill the previously created tables
def fill_table_with_data(filename, tablename):
    # connect to the db
    conn = connect_to_db()

    # A query to insert data (row by row) to the table taxi
    q_insert_data = f'''insert into {tablename}
                    (
                                vendorID,
                                tpep_pickup_datetime,
                                tpep_dropoff_datetime,
                                passenger_count,
                                trip_distance,
                                pickup_longitude,
                                pickup_latitude,
                                RateCodeID,
                                store_and_fwd_flag,
                                dropoff_longitude,
                                dropoff_latitude,
                                payment_type,
                                fare_amount,
                                extra,
                                mta_tax,
                                tip_amount,
                                tolls_amount,
                                improvement_surcharge,
                                total_amount
                        )
                        values (%s,%s,%s,%s,%s, %s,%s,%s,%s,%s, %s,%s,%s,%s,%s, %s,%s,%s,%s)'''

    try:
        cur = conn.cursor()
        total = 0  # count how many rows are inserted
        
        with open(filename, 'r') as file:
            # rows: a list of tuples
            rows = [tuple(line.strip().split(',')) for line in file if line]
            
            # a row: a tuple of 19 string values
            # Fill the table by inserting data row by row iteration
            for row in rows:
                
                # Skip header if exists
                if row[0].isalpha(): 
                    pass
                
                # Skip if no geo data
                elif row[5:7] == ('0','0') or row[9:11] == ('0','0'): 
                    pass
                
                else:
                    cur.execute(q_insert_data, row)
                    total += 1

            # All rows inserted -> commit the changes to the db
            conn.commit()
            print(f'{total} rows inserted out of {len(rows)}')

    # Print error message if query fails
    except psycopg2.DatabaseError:
        print ("Failed to copy data to the table")

    finally:
        cur.close()
        conn.close ()

**Note**

To insert data to the tables inside the db, a file was read line by line and itered, because lines with incorrect datatypes had to be filtered out.

In [9]:
# Use the function to fill the 'taxi_jan' table
for i in range(len(Jan)):
    fill_table_with_data(Jan[i], 'taxi_jan')

489619 rows inserted out of 500000
489654 rows inserted out of 500000
489574 rows inserted out of 500000
489456 rows inserted out of 500000
489867 rows inserted out of 500000
489604 rows inserted out of 500000
489581 rows inserted out of 500000
489417 rows inserted out of 500000
489577 rows inserted out of 500000
489493 rows inserted out of 500000
489664 rows inserted out of 500000
489508 rows inserted out of 500000
489887 rows inserted out of 500000
489905 rows inserted out of 500000
489556 rows inserted out of 500000
489652 rows inserted out of 500000
489618 rows inserted out of 500000
489624 rows inserted out of 500000
489529 rows inserted out of 500000
489676 rows inserted out of 500000
489647 rows inserted out of 500000
489721 rows inserted out of 500000
489660 rows inserted out of 500000
489608 rows inserted out of 500000
489424 rows inserted out of 500000
243813 rows inserted out of 248987


In [11]:
# Load all the NY taxi data from 2015-Apr to the database
for i in range(len(Apr)):
    fill_table_with_data(Apr[i], 'taxi_apr')

490624 rows inserted out of 500000
490729 rows inserted out of 500000
490839 rows inserted out of 500000
490676 rows inserted out of 500000
490739 rows inserted out of 500000
490844 rows inserted out of 500000
490963 rows inserted out of 500000
490871 rows inserted out of 500000
490740 rows inserted out of 500000
490836 rows inserted out of 500000
490870 rows inserted out of 500000
490717 rows inserted out of 500000
490856 rows inserted out of 500000
490639 rows inserted out of 500000
490806 rows inserted out of 500000
490667 rows inserted out of 500000
490719 rows inserted out of 500000
490925 rows inserted out of 500000
490799 rows inserted out of 500000
490681 rows inserted out of 500000
490700 rows inserted out of 500000
490798 rows inserted out of 500000
490891 rows inserted out of 500000
490781 rows inserted out of 500000
490679 rows inserted out of 500000
490704 rows inserted out of 500000
70505 rows inserted out of 71790


In [12]:
# Load all the NY taxi data from 2015-Jul to the database
for i in range(len(Jul)):
    fill_table_with_data(Jul[i], 'taxi_jul')

492271 rows inserted out of 500000
492293 rows inserted out of 500000
492249 rows inserted out of 500000
492168 rows inserted out of 500000
492148 rows inserted out of 500000
492519 rows inserted out of 500000
492174 rows inserted out of 500000
491991 rows inserted out of 500000
492203 rows inserted out of 500000
492355 rows inserted out of 500000
492647 rows inserted out of 500000
492037 rows inserted out of 500000
492405 rows inserted out of 500000
492104 rows inserted out of 500000
492033 rows inserted out of 500000
492356 rows inserted out of 500000
492418 rows inserted out of 500000
492063 rows inserted out of 500000
492692 rows inserted out of 500000
492394 rows inserted out of 500000
492548 rows inserted out of 500000
492169 rows inserted out of 500000
490694 rows inserted out of 500000
61091 rows inserted out of 62784


# 

# More data: GeoJSON to PostGIS

In the next step (notebook 2.EDA+model), the NY_taxi data will be queried to be read as a GeoDataFrame. To query the data more efficiently, I would like to include spatial join such as 'ST_Contains' in the query. In order to use this method, another geometric attribute - 'Polygons' of census blocks - should be present in the same database.

### Prepare data

In [58]:
# Read NYC census block group geometries file as a table
census = gpd.read_file('../data/nyc_cbg_geoms.geojson')
print(census.crs)
census.head()

epsg:4326


Unnamed: 0,geoid,geometry
0,360050001001,"POLYGON ((-73.89277 40.79284, -73.89261 40.792..."
1,360050002001,"POLYGON ((-73.86285 40.81267, -73.86191 40.812..."
2,360050002002,"POLYGON ((-73.86708 40.81444, -73.86332 40.812..."
3,360050002003,"POLYGON ((-73.85856 40.80665, -73.85848 40.806..."
4,360050004001,"POLYGON ((-73.85972 40.81527, -73.85956 40.815..."


In [59]:
# Use GeoAlchemy's WKTElement to create a geometric attribute with SRID
def create_wkt_element(geom):
    return WKTElement(geom.wkt, srid=4326)

# Convert the geometry column type as object (if not, data cannot be queried)
census['geometry'] = census['geometry'].apply(create_wkt_element)
census.dtypes



geoid       object
geometry    object
dtype: object

### Prepare db (create a new table for the census data)

In [60]:
# Create a table called census_blocks inside the postgis database
def create_census_block_table(tablename):
    # connect to the db
    conn = connect_to_db()

    # Prepare a query to create table for census_blocks
    q_create_table = f"""
                    create table {tablename}
                    (
                        geoid int,
                        geometry geometry(Polygon,4326)
                    )
                    """

    try:
        cur = conn.cursor()  # initiate cursor (communication with db)
        cur.execute(q_create_table)  # execute the query
        conn.commit()
        print(f'{tablename} created')

    except psycopg2.DatabaseError:
        print ("Failed to create the table")

    # Close the communication & connection with the postgis
    finally:
        cur.close()
        conn.close ()
        
create_census_block_table('census_blocks')

census_blocks created


### Read the file, transform the data and load it to the db

In [61]:
# Create SQL connection engine
engine = create_engine('postgresql://carto:carto@postgis:5432/carto')
print('Engine created')

# Connect to database using a context manager
with engine.connect() as conn, conn.begin():
    print('Connected to the db')
    census.to_sql('census_blocks',
                  engine,
                  if_exists='replace',
                  index=False,
                  dtype={'geometry':Geometry})
    
    print('Table created')
    
    # Convert the geometry column back to Geometry datatype from string
    sql = """ALTER TABLE census_blocks
             ALTER COLUMN geometry TYPE Geometry
             USING ST_SetSRID(geometry::Geometry, 4326)"""
    conn.execute(sql)
    
    print('Data type converted succesfully')
    
conn.close()
print('Connection to the database closed')

Engine created
Connected to the db
Table created
Data type converted succesfully
Connection to the database closed


# 

# Summary:

1. Data from Jan, Apr, Jul 2015 were loaded to a Postgis database. Their geometric attributes were not correctly assigned with their corresponding data type yet.


2. Census block geometries were also loaded to the same databse for furture spatial queries.


**3. How can this process be scaled for larger data?**

One can create more tables partitioned by time. For instance, by day, so there be tables for every day's taxi trips. As a way to handle the given data, I already performed a partition by month.
    
Another way is to process the incoming data incrementally. For example, the function I programmed takes data month by month. Therefore, in case there is more upcoming data, I don't need to reprocess the previous data.

## End of basic ETL

-----------------------------------------------------------

## Next step: EDA and Train a baseline model
