## Building a Data Pipeline and Creating Reporting Tables using NYC Yellow Taxi Dataset

In [1]:
#Importing the necessary libraries
import pandas as pd                   #for dataframe
from sqlalchemy import create_engine  #to connect to sql data warehouse
from time import time                 #for time

In [2]:
#Connecting to database
engine = create_engine('postgresql://postgres:9876@localhost:5432/dw')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f7f737b9ac0>

In [3]:
#Defining an ETL Function with arguments chunk_size and connection)

def etl_nyc_taxi(chunk_size, connection):
    
    df_data = pd.read_csv('yellow_tripdata_2016-02.csv',chunksize=chunk_size)
    count = 0
    overall_start_time = time()
    for chunk in df_data:
        if count < 10:
            t_start = time()
            chunk['tpep_pickup_datetime'] = pd.to_datetime(chunk['tpep_pickup_datetime'])
            chunk['tpep_dropoff_datetime'] = pd.to_datetime(chunk['tpep_dropoff_datetime'])
            chunk['tpep_dropoff_datetime'] = pd.to_datetime(chunk['tpep_dropoff_datetime'])
            chunk['VendorID'] = chunk['VendorID'].map(str)
            chunk.to_sql(name='nyc_yellow_taxi',con=connection,if_exists='append')            #this also create sql table
            count +=1
            t_end = time()
            print(f'{count}) loaded next chunk {t_end - t_start:.3f} seconds')

    overall_end_time = time() - overall_start_time
    print(f'finished loading a total of {chunk_size*count:,} records in {overall_end_time} seconds')

In [4]:
etl_nyc_taxi(100000,engine)

1) loaded next chunk 12.167 seconds
2) loaded next chunk 10.934 seconds
3) loaded next chunk 10.981 seconds
4) loaded next chunk 10.808 seconds
5) loaded next chunk 10.747 seconds
6) loaded next chunk 10.573 seconds
7) loaded next chunk 10.641 seconds
8) loaded next chunk 11.128 seconds
9) loaded next chunk 10.648 seconds
10) loaded next chunk 10.640 seconds
finished loading a total of 1,000,000 records in 147.53504085540771 seconds


### SQL Queries

In [5]:
#Overview

data_overview = '''
SELECT * FROM nyc_yellow_taxi LIMIT 4

'''
pd.read_sql(data_overview,engine)

Unnamed: 0,index,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,0,2,2016-02-25 17:24:20,2016-02-25 17:27:20,2,0.7,-73.94725,40.763771,1,N,-73.992012,40.73539,2,5.0,0.0,0.5,0.0,0.0,0.3,5.8
1,1,2,2016-02-25 23:10:50,2016-02-25 23:31:50,2,5.52,-73.983017,40.750992,1,N,-73.988586,40.758839,2,20.0,0.5,0.5,0.0,0.0,0.3,21.3
2,2,2,2016-02-01 00:00:01,2016-02-01 00:10:52,6,1.99,-73.99234,40.758202,1,N,-73.964355,40.757977,1,9.5,0.5,0.5,0.7,0.0,0.3,11.5
3,3,1,2016-02-01 00:00:04,2016-02-01 00:05:16,1,1.5,-73.981453,40.749722,1,N,-73.982323,40.763985,2,6.5,0.5,0.5,0.0,0.0,0.3,7.8


#### operations_and_performance queries

In [6]:
#1. How many trips were recorded in the dataset?
total_trips='''
SELECT COUNT(trip_distance) AS tt_taxi_trips from nyc_yellow_taxi
WHERE trip_distance > 0

'''
pd.read_sql(total_trips,engine)

Unnamed: 0,tt_taxi_trips
0,994264


In [7]:
#2. What is the average trip distance for all trips?
avg_trip_distance ='''
SELECT AVG(trip_distance) AS avg_taxi_trip from nyc_yellow_taxi
WHERE trip_distance > 0

'''

pd.read_sql(avg_trip_distance,engine)

Unnamed: 0,avg_taxi_trip
0,2.767276


In [8]:
#2. What is the average trip distance for all trips?
avg_trip_distance ='''
SELECT AVG(trip_distance) AS avg_taxi_trip from nyc_yellow_taxi

'''

pd.read_sql(avg_trip_distance,engine)

Unnamed: 0,avg_taxi_trip
0,2.751403


In [9]:
#3. Which Vendor has the highest number of trips?
highest_trip_vendor='''

SELECT "VendorID", sum(trip_distance) as total_trips
from nyc_yellow_taxi
WHERE trip_distance > 0
Group by "VendorID"
Order by total_trips Desc
limit 1

'''

pd.read_sql(highest_trip_vendor,engine)

Unnamed: 0,VendorID,total_trips
0,2,1494029.24


In [10]:
#3. Which Vendor has the highest number of trips?
highest_trip_vendor='''

SELECT "VendorID", sum(trip_distance) as total_trips
from nyc_yellow_taxi
Group by "VendorID"
Order by total_trips Desc
limit 1

'''

pd.read_sql(highest_trip_vendor,engine)

Unnamed: 0,VendorID,total_trips
0,2,1494029.24


In [11]:
#4. Which Vendor has the lowest number of trips? 
lowest_trip_vendor='''

SELECT "VendorID", sum(trip_distance) as total_trips
from nyc_yellow_taxi
WHERE trip_distance > 0
Group by "VendorID"
Order by total_trips Asc
limit 1

'''
pd.read_sql(lowest_trip_vendor,engine)

Unnamed: 0,VendorID,total_trips
0,1,1257374.1


In [12]:
#5. What is the average passenger count per trip?

avg_passenger_count ='''
SELECT AVG(passenger_count) AS passenger_count_avg from nyc_yellow_taxi
WHERE trip_distance > 0

'''

pd.read_sql(avg_passenger_count,engine)


Unnamed: 0,passenger_count_avg
0,1.632863


In [13]:
#5. What is the average passenger count per trip?

avg_passenger_count ='''
SELECT AVG(passenger_count) AS passenger_count_avg from nyc_yellow_taxi

'''

pd.read_sql(avg_passenger_count,engine)

Unnamed: 0,passenger_count_avg
0,1.631115


#### customer_demographics_and_preferences queries

In [14]:
#1. What is the average trip amount given by passengers?
avg_tip_amount ='''
SELECT AVG(tip_amount) AS tip_amount_avg from nyc_yellow_taxi
WHERE trip_distance > 0

'''

pd.read_sql(avg_tip_amount,engine)

Unnamed: 0,tip_amount_avg
0,1.819865


In [15]:
#1. What is the average trip amount given by passengers?
avg_tip_amount ='''
SELECT AVG(tip_amount) AS tip_amount_avg from nyc_yellow_taxi
WHERE trip_distance > 0

'''

pd.read_sql(avg_tip_amount,engine)

Unnamed: 0,tip_amount_avg
0,1.819865


In [16]:
#2. What is the average trip distance by passengers?
avg_trip_distance_by_passenger ='''
SELECT AVG(trip_distance) AS trip_distance_avg from nyc_yellow_taxi
WHERE trip_distance > 0

'''

pd.read_sql(avg_trip_distance_by_passenger,engine)

Unnamed: 0,trip_distance_avg
0,2.767276


In [17]:
#2. What is the average trip distance by passengers?
avg_trip_distance_by_passenger ='''
SELECT AVG(trip_distance) AS trip_distance_avg from nyc_yellow_taxi

'''

pd.read_sql(avg_trip_distance_by_passenger,engine)

Unnamed: 0,trip_distance_avg
0,2.751403


In [18]:
#3. How many trips were flagged as 'store and forward'?
store_and_forward_trips ='''
SELECT count(*) as count_trips_flagged_storeandforward FROM nyc_yellow_taxi
WHERE trip_distance > 0
and store_and_fwd_flag like 'Y'
'''
pd.read_sql(store_and_forward_trips,engine)

Unnamed: 0,count_trips_flagged_storeandforward
0,5720


In [19]:
#3. How many trips were flagged as 'store and forward'?
store_and_forward_trips ='''
SELECT count(*) as count_trips_flagged_storeandforward FROM nyc_yellow_taxi
where store_and_fwd_flag like 'Y'
'''
pd.read_sql(store_and_forward_trips,engine)

Unnamed: 0,count_trips_flagged_storeandforward
0,5918


In [20]:
#4. How many trips were shared rides (passenger count > 1)?
shared_ride_count ='''
SELECT count(*) AS shared_ride_count from nyc_yellow_taxi
WHERE trip_distance > 0
and passenger_count > 1

'''

pd.read_sql(shared_ride_count,engine)

Unnamed: 0,shared_ride_count
0,266145


In [21]:
#4. How many trips were shared rides (passenger count > 1)?
shared_ride_count ='''
SELECT count(*) AS shared_ride_count from nyc_yellow_taxi
where passenger_count > 1

'''

pd.read_sql(shared_ride_count,engine)

Unnamed: 0,shared_ride_count
0,267128


#### financial_performance queries

In [22]:
#1. What is the average fare amount per trip?
avg_fare_amount  ='''
SELECT AVG(fare_amount) AS fare_amount_avg from nyc_yellow_taxi
WHERE trip_distance > 0

'''

pd.read_sql(avg_fare_amount ,engine)

Unnamed: 0,fare_amount_avg
0,12.295774


In [23]:
#1. What is the average fare amount per trip?
avg_fare_amount  ='''
SELECT AVG(fare_amount) AS fare_amount_avg from nyc_yellow_taxi

'''

pd.read_sql(avg_fare_amount ,engine)

Unnamed: 0,fare_amount_avg
0,12.343774


In [24]:
#2. How much revenue was generated from tolls and surcharges combined?
tolls_and_surcharges_revenue  ='''

select sum(Tolls_and_surcharges_revenue) as tolls_surcharges_rev
from
(SELECT tolls_amount + improvement_surcharge as Tolls_and_surcharges_revenue
from nyc_yellow_taxi) subquery


'''
pd.read_sql(tolls_and_surcharges_revenue, engine)

Unnamed: 0,tolls_surcharges_rev
0,598262.799997


In [25]:
#3. What is the average total amount paid by passengers?
avg_total_amount  ='''
SELECT AVG(total_amount) AS total_amount_avg from nyc_yellow_taxi
WHERE trip_distance > 0

'''

pd.read_sql(avg_total_amount ,engine)

Unnamed: 0,total_amount_avg
0,15.523111


In [26]:
#3. What is the average total amount paid by passengers?
avg_total_amount  ='''
SELECT AVG(total_amount) AS total_amount_avg from nyc_yellow_taxi

'''

pd.read_sql(avg_total_amount ,engine)

Unnamed: 0,total_amount_avg
0,15.578133


## Reports

In [27]:
#operations_and_performance_report

operations_report= '''
SELECT CURRENT_DATE AS ingestion_date, * 
FROM

(SELECT COUNT(trip_distance) AS tt_taxi_trips from nyc_yellow_taxi
WHERE trip_distance > 0) as m,
(SELECT AVG(trip_distance) AS avg_taxi_trip from nyc_yellow_taxi
WHERE trip_distance > 0) as n,
(SELECT "VendorID", sum(trip_distance) as total_trips
from nyc_yellow_taxi
WHERE trip_distance > 0
Group by "VendorID"
Order by total_trips Desc
limit 1) as o,
(SELECT "VendorID", sum(trip_distance) as total_trips
from nyc_yellow_taxi
WHERE trip_distance > 0
Group by "VendorID"
Order by total_trips Asc
limit 1) as p,
(SELECT AVG(passenger_count) AS passenger_count_avg from nyc_yellow_taxi
WHERE trip_distance > 0) as q

'''
pd.read_sql(operations_report,engine)
#ingestion_date|total_trips|avg_trip_distance|highest_trip_vendor|lowest_trip_vendor|avg_passenger_count

Unnamed: 0,ingestion_date,tt_taxi_trips,avg_taxi_trip,VendorID,total_trips,VendorID.1,total_trips.1,passenger_count_avg
0,2023-09-21,994264,2.767276,2,1494029.24,1,1257374.1,1.632863


In [28]:
#customer_demographics_and_preferences_report

demographic_and_preferences_report= '''
SELECT CURRENT_DATE AS ingestion_date, * 
FROM

(SELECT AVG(tip_amount) AS tip_amount_avg from nyc_yellow_taxi
WHERE trip_distance > 0) as f,
(SELECT AVG(trip_distance) AS trip_distance_avg from nyc_yellow_taxi
WHERE trip_distance > 0) as g,
(SELECT count(*) as count_trips_flagged_storeandforward FROM nyc_yellow_taxi
WHERE trip_distance > 0
and store_and_fwd_flag like 'Y') as h,
(SELECT count(*) AS shared_ride_count from nyc_yellow_taxi
WHERE trip_distance > 0
and passenger_count > 1) as i

'''
pd.read_sql(demographic_and_preferences_report,engine)
#ingestion_date | avg_tip_amount | avg_trip_distance_by_passenger | store_and_forward_trips | shared_ride_count

Unnamed: 0,ingestion_date,tip_amount_avg,trip_distance_avg,count_trips_flagged_storeandforward,shared_ride_count
0,2023-09-21,1.819865,2.767276,5720,266145


In [29]:
#financial_performance_report

financial_performance_report= '''
SELECT CURRENT_DATE AS ingestion_date, * 
FROM

(SELECT AVG(fare_amount) AS fare_amount_avg from nyc_yellow_taxi
WHERE trip_distance > 0) as j,
(select sum(Tolls_and_surcharges_revenue) as tolls_surcharges_rev
from
(SELECT tolls_amount + improvement_surcharge as Tolls_and_surcharges_revenue
from nyc_yellow_taxi) subquery) as k,
(SELECT AVG(total_amount) AS total_amount_avg from nyc_yellow_taxi
WHERE trip_distance > 0) as l

'''
pd.read_sql(financial_performance_report,engine)
#ingestion_date | avg_fare_amount | tolls_and_surcharges_revenue | avg_total_amount

Unnamed: 0,ingestion_date,fare_amount_avg,tolls_surcharges_rev,total_amount_avg
0,2023-09-21,12.295774,598262.799997,15.523111


#### Building function to Transform report into a Dataframe; and to load Dataframe into a warehouse 

In [30]:
def transform_data(query,connection):
    try:
        report_query = pd.read_sql(query,connection)
        print('successfully transformed the data')
        return report_query      
    except:
        print('Encountered error while transforming')

In [31]:
def load_report_to_warehouse(dataframe,table,connection):
    try:
        dataframe.to_sql(table, con=connection,if_exists='append')
        print(f'successfully updated {table} table')
    except:
        print(f'Could not update {table}')

#### Running the Functions

In [32]:
#RUNNING THE ENTIRE PROCESSES
df_operations_report = transform_data(operations_report,engine)
df_demographic_and_preferences_report = transform_data(demographic_and_preferences_report,engine)
df_financial_performance_report = transform_data(financial_performance_report,engine)

load_report_to_warehouse(df_operations_report,'operations_report_table',engine)
load_report_to_warehouse(df_demographic_and_preferences_report,'demographic_and_preferences_table',engine)
load_report_to_warehouse(df_financial_performance_report,'financial_performance_table',engine)

successfully transformed the data
successfully transformed the data
successfully transformed the data
successfully updated operations_report_table table
successfully updated demographic_and_preferences_table table
successfully updated financial_performance_table table
