In [2]:
# Importing libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, row_number, monotonically_increasing_id
from pyspark.sql.window import Window
import psycopg2 as psy
import os 
from dotenv import load_dotenv
from pyspark.sql import functions as f

In [3]:

# Path to the PostgreSQL JDBC driver
jdbc_driver_path= r'C:\Users\HP SPECTRE\Documents\Data Journey\10Alytics\ETL_Orchestration\class_case_study\New_York_Taxi_Trip_Record\postgresql-42.7.3.jar'

#Setup Spark Session
spark= SparkSession.builder.appName('NY Trip Data') \
                   .config("spark.jars" , jdbc_driver_path) \
                   .config ("spark.executor.memory", "4g") \
                   .getOrCreate()


In [4]:
file_path= r"C:\Users\HP SPECTRE\Documents\Data Journey\10Alytics\ETL_Orchestration\class_case_study\yellow_tripdata_2009-01DATASET.parquet"
ny_df=spark.read.option('mode','DROPMALFORMED' ).parquet(file_path)

In [5]:
print('Length of dataset is:', ny_df.count())

Length of dataset is: 14092413


In [6]:
ny_df.show()

+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|     Trip_Distance|         Start_Lon|Start_Lat|Rate_Code|store_and_forward|           End_Lon|  End_Lat|Payment_Type|         Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|              2.63|        -73.991957|40.721567|     NULL|             NULL|        -73.993803|40.695922|        CASH|              8.9|      0.5|   NULL|    0.0| 

In [7]:
#Checking columns
ny_df.columns

['vendor_name',
 'Trip_Pickup_DateTime',
 'Trip_Dropoff_DateTime',
 'Passenger_Count',
 'Trip_Distance',
 'Start_Lon',
 'Start_Lat',
 'Rate_Code',
 'store_and_forward',
 'End_Lon',
 'End_Lat',
 'Payment_Type',
 'Fare_Amt',
 'surcharge',
 'mta_tax',
 'Tip_Amt',
 'Tolls_Amt',
 'Total_Amt']

In [8]:
#Check schema of data
ny_df.printSchema()

root
 |-- vendor_name: string (nullable = true)
 |-- Trip_Pickup_DateTime: string (nullable = true)
 |-- Trip_Dropoff_DateTime: string (nullable = true)
 |-- Passenger_Count: long (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Start_Lon: double (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Rate_Code: double (nullable = true)
 |-- store_and_forward: double (nullable = true)
 |-- End_Lon: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- Tolls_Amt: double (nullable = true)
 |-- Total_Amt: double (nullable = true)



In [9]:
#taking a subset of our dataset

ny_df_subset=ny_df.limit(20000)

In [10]:
#Checking new row count
ny_df_subset.count()

20000

In [11]:
ny_df_subset.show()

+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|     Trip_Distance|         Start_Lon|Start_Lat|Rate_Code|store_and_forward|           End_Lon|  End_Lat|Payment_Type|         Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|              2.63|        -73.991957|40.721567|     NULL|             NULL|        -73.993803|40.695922|        CASH|              8.9|      0.5|   NULL|    0.0| 

In [12]:
#checkinf for duplicate rows
ny_df_subset.groupBy('vendor_name',
 'Trip_Pickup_DateTime',
 'Trip_Dropoff_DateTime',
 'Passenger_Count',
 'Trip_Distance',
 'Start_Lon',
 'Start_Lat',
 'Rate_Code',
 'store_and_forward',
 'End_Lon',
 'End_Lat',
 'Payment_Type',
 'Fare_Amt',
 'surcharge',
 'mta_tax',
 'Tip_Amt',
 'Tolls_Amt',
 'Total_Amt').count().filter('count > 1').show()

+-----------+--------------------+---------------------+---------------+-------------+---------+---------+---------+-----------------+-------+-------+------------+--------+---------+-------+-------+---------+---------+-----+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|Trip_Distance|Start_Lon|Start_Lat|Rate_Code|store_and_forward|End_Lon|End_Lat|Payment_Type|Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|count|
+-----------+--------------------+---------------------+---------------+-------------+---------+---------+---------+-----------------+-------+-------+------------+--------+---------+-------+-------+---------+---------+-----+
+-----------+--------------------+---------------------+---------------+-------------+---------+---------+---------+-----------------+-------+-------+------------+--------+---------+-------+-------+---------+---------+-----+



In [13]:
#Find the missing values in the dataset
for column in ny_df_subset.columns:
  null_count= ny_df_subset.filter(f.col(column).isNull()).count()
  print(column, null_count)


vendor_name 0
Trip_Pickup_DateTime 0
Trip_Dropoff_DateTime 0
Passenger_Count 0
Trip_Distance 0
Start_Lon 0
Start_Lat 0
Rate_Code 20000
store_and_forward 20000
End_Lon 0
End_Lat 0
Payment_Type 0
Fare_Amt 0
surcharge 0
mta_tax 20000
Tip_Amt 0
Tolls_Amt 0
Total_Amt 0


In [14]:
#FInding minssing values using list comprehension
null_counts_2= [(column, ny_df_subset.where(f.col(column).isNull()).count()) for column in ny_df_subset.columns]
null_counts_2


[('vendor_name', 0),
 ('Trip_Pickup_DateTime', 0),
 ('Trip_Dropoff_DateTime', 0),
 ('Passenger_Count', 0),
 ('Trip_Distance', 0),
 ('Start_Lon', 0),
 ('Start_Lat', 0),
 ('Rate_Code', 20000),
 ('store_and_forward', 20000),
 ('End_Lon', 0),
 ('End_Lat', 0),
 ('Payment_Type', 0),
 ('Fare_Amt', 0),
 ('surcharge', 0),
 ('mta_tax', 20000),
 ('Tip_Amt', 0),
 ('Tolls_Amt', 0),
 ('Total_Amt', 0)]

In [15]:
#Getting columns to drop
columns_to_drop = [column for column, count in null_counts_2 if count > 0.1 * ny_df_subset.count()]
columns_to_drop

['Rate_Code', 'store_and_forward', 'mta_tax']

In [16]:
#Droping columns with more than 10% nulls
ny_df_subset= ny_df_subset.drop(*columns_to_drop)

ny_df_subset.columns

['vendor_name',
 'Trip_Pickup_DateTime',
 'Trip_Dropoff_DateTime',
 'Passenger_Count',
 'Trip_Distance',
 'Start_Lon',
 'Start_Lat',
 'End_Lon',
 'End_Lat',
 'Payment_Type',
 'Fare_Amt',
 'surcharge',
 'Tip_Amt',
 'Tolls_Amt',
 'Total_Amt']

In [17]:
ny_df_subset.show()

+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+------------------+---------+------------+-----------------+---------+-------+---------+---------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|     Trip_Distance|         Start_Lon|Start_Lat|           End_Lon|  End_Lat|Payment_Type|         Fare_Amt|surcharge|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+------------------+---------+------------+-----------------+---------+-------+---------+---------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|              2.63|        -73.991957|40.721567|        -73.993803|40.695922|        CASH|              8.9|      0.5|    0.0|      0.0|      9.4|
|        VTS| 2009-01-04 03:31:00|  2009-01-04 03:38:00|              3|              4.55|        -73.982102| 40.73629|    

In [18]:
#Data transformation

ny_df_subset=ny_df_subset.filter(
    (f.col('Passenger_Count') > 0.0) &
    (f.col('Trip_Distance') > 0.0) & 
    (f.col("Fare_Amt") > 0.0) &
    (f.col("Total_Amt") > 0.0) & 
    (f.col("Tip_Amt")>= 0.0) &
    (f.col("Tolls_Amt")>= 0.0) &
    (f.col("surcharge") >= 0.0)
    )

In [19]:
ny_df_subset.count()

19835

In [20]:
#checking data type
ny_df_subset.dtypes

[('vendor_name', 'string'),
 ('Trip_Pickup_DateTime', 'string'),
 ('Trip_Dropoff_DateTime', 'string'),
 ('Passenger_Count', 'bigint'),
 ('Trip_Distance', 'double'),
 ('Start_Lon', 'double'),
 ('Start_Lat', 'double'),
 ('End_Lon', 'double'),
 ('End_Lat', 'double'),
 ('Payment_Type', 'string'),
 ('Fare_Amt', 'double'),
 ('surcharge', 'double'),
 ('Tip_Amt', 'double'),
 ('Tolls_Amt', 'double'),
 ('Total_Amt', 'double')]

In [21]:
#Changing data types
columns_to_cast = {
    "Trip_Pickup_DateTime": "timestamp",
    "Trip_Dropoff_DateTime": "timestamp",
    "Passenger_Count": "integer",
}

In [22]:
#Changing data types
for col_name, col_type in columns_to_cast.items():
    ny_df_subset= ny_df_subset.withColumn(col_name, f.col(col_name).cast(col_type))

In [23]:
ny_df_subset.dtypes

[('vendor_name', 'string'),
 ('Trip_Pickup_DateTime', 'timestamp'),
 ('Trip_Dropoff_DateTime', 'timestamp'),
 ('Passenger_Count', 'int'),
 ('Trip_Distance', 'double'),
 ('Start_Lon', 'double'),
 ('Start_Lat', 'double'),
 ('End_Lon', 'double'),
 ('End_Lat', 'double'),
 ('Payment_Type', 'string'),
 ('Fare_Amt', 'double'),
 ('surcharge', 'double'),
 ('Tip_Amt', 'double'),
 ('Tolls_Amt', 'double'),
 ('Total_Amt', 'double')]

In [24]:
#Print out all categorical columns
categorical_columns= [item[0] for item in ny_df_subset.dtypes if item[1].startswith('string')]

categorical_columns

['vendor_name', 'Payment_Type']

In [25]:
#Checking the distinct values in the categorical data
ny_df_subset.select('vendor_name').distinct().show()

+-----------+
|vendor_name|
+-----------+
|        VTS|
|        DDS|
|        CMT|
+-----------+



In [26]:
#Checking the distinct values in the categorical data
ny_df_subset.select('Payment_Type').distinct().show()

+------------+
|Payment_Type|
+------------+
|        CASH|
|      Credit|
|      CREDIT|
|        Cash|
|   No Charge|
|     Dispute|
+------------+



In [27]:
# Fixing payment_type column
ny_df_subset= ny_df_subset.withColumn("Payment_Type", 
                                      when(f.col('Payment_Type')== 'CASH', 'Cash')
                                      .when(f.col('Payment_Type') == 'CREDIT', 'Credit')
                                      .otherwise(f.col('Payment_Type'))
                                    )

In [28]:
ny_df_subset.select('Payment_Type').distinct().show()

+------------+
|Payment_Type|
+------------+
|        Cash|
|      Credit|
|   No Charge|
|     Dispute|
+------------+



Creating Different Tables

In [29]:
#Creating vendor table

vendors= ny_df_subset.select('vendor_name') \
                    .withColumn('vendor_id', monotonically_increasing_id()+1) \
                    .select('vendor_id', 'vendor_name')

vendors.show()

+---------+-----------+
|vendor_id|vendor_name|
+---------+-----------+
|        1|        VTS|
|        2|        VTS|
|        3|        VTS|
|        4|        DDS|
|        5|        DDS|
|        6|        DDS|
|        7|        DDS|
|        8|        VTS|
|        9|        CMT|
|       10|        CMT|
|       11|        CMT|
|       12|        CMT|
|       13|        DDS|
|       14|        CMT|
|       15|        CMT|
|       16|        CMT|
|       17|        CMT|
|       18|        CMT|
|       19|        DDS|
|       20|        CMT|
+---------+-----------+
only showing top 20 rows



In [30]:
vendors.count()

19835

In [31]:
payments= ny_df_subset.select('Payment_Type', 'Fare_Amt', 'surcharge', 'Tip_Amt', 'Tolls_Amt', 'Total_Amt') \
                      .withColumn('payment_id', monotonically_increasing_id()+1) \
                      .select('payment_id','Payment_Type', 'Fare_Amt', 'surcharge', 'Tip_Amt', 'Tolls_Amt', 'Total_Amt' )

payments.show(5)

+----------+------------+--------+---------+-------+---------+---------+
|payment_id|Payment_Type|Fare_Amt|surcharge|Tip_Amt|Tolls_Amt|Total_Amt|
+----------+------------+--------+---------+-------+---------+---------+
|         1|        Cash|     8.9|      0.5|    0.0|      0.0|      9.4|
|         2|      Credit|    12.1|      0.5|    2.0|      0.0|     14.6|
|         3|      Credit|    23.7|      0.0|   4.74|      0.0|    28.44|
|         4|      Credit|    14.9|      0.5|   3.05|      0.0|    18.45|
|         5|        Cash|     3.7|      0.0|    0.0|      0.0|      3.7|
+----------+------------+--------+---------+-------+---------+---------+
only showing top 5 rows



In [32]:
payments.count()

19835

In [33]:
#Locations table

locations= ny_df_subset.select('Start_Lon', 'Start_Lat', 'End_Lon', 'End_Lat') \
                       .withColumn('location_id', monotonically_increasing_id()+1) \
                       .select('location_id','Start_Lon', 'Start_Lat', 'End_Lon', 'End_Lat')

locations.show(3)

+-----------+----------+---------+----------+---------+
|location_id| Start_Lon|Start_Lat|   End_Lon|  End_Lat|
+-----------+----------+---------+----------+---------+
|          1|-73.991957|40.721567|-73.993803|40.695922|
|          2|-73.982102| 40.73629| -73.95585| 40.76803|
|          3|-74.002587|40.739748|-73.869983|40.770225|
+-----------+----------+---------+----------+---------+
only showing top 3 rows



In [34]:
locations.count()

19835

In [35]:
#Trip Table

trips= ny_df_subset.withColumn('trip_id',monotonically_increasing_id()+1 )

trips= trips.join(vendors, trips.trip_id==vendors.vendor_id, 'left')\
            .join(payments,trips.trip_id==payments.payment_id, 'left' )\
            .join (locations,trips.trip_id==locations.location_id, 'left' ) \
            .select ('trip_id','Trip_Pickup_DateTime','Trip_Dropoff_DateTime','Passenger_Count',
                     'Trip_Distance','vendor_id','payment_id', 'location_id', )

trips.show()             

+-------+--------------------+---------------------+---------------+------------------+---------+----------+-----------+
|trip_id|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|     Trip_Distance|vendor_id|payment_id|location_id|
+-------+--------------------+---------------------+---------------+------------------+---------+----------+-----------+
|      1| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|              2.63|        1|         1|          1|
|      2| 2009-01-04 03:31:00|  2009-01-04 03:38:00|              3|              4.55|        2|         2|          2|
|      3| 2009-01-03 15:43:00|  2009-01-03 15:57:00|              5|             10.35|        3|         3|          3|
|      4| 2009-01-01 20:52:58|  2009-01-01 21:14:00|              1|               5.0|        4|         4|          4|
|      5| 2009-01-24 16:18:23|  2009-01-24 16:24:56|              1|               0.4|        5|         5|          5|
|      6| 2009-01-16 22:35:59|  

In [36]:
trips.count()

19835

In [37]:
#Creating an environment for password
load_dotenv()

password= os.getenv('PASSWORD')

In [38]:
#cREATING CONNECTION TO DATABASE
def get_connection():
    connection= psy.connect(dbname='ny_taxi_trips', user='postgres', password=password, host='localhost', port=5432)
    return connection

conn=get_connection()
cur=conn.cursor()

In [44]:
#Create tables in database

conn= get_connection()
cur=conn.cursor()

create_table_query= '''
                    DROP TABLE IF EXISTS vendors CASCADE;
                    DROP TABLE IF EXISTS payments CASCADE;
                    DROP TABLE IF EXISTS locations CASCADE;
                    DROP TABLE IF EXISTS trips CASCADE;

                    CREATE TABLE IF NOT EXISTS vendors (
                        vendor_id INT PRIMARY KEY,
                        vendor_name VARCHAR(50)
                    );

                    CREATE TABLE locations (
                        location_id INT PRIMARY KEY,
                        start_lon FLOAT,
                        start_lat FLOAT,
                        end_lon FLOAT,
                        end_lat FLOAT
                    );

                    CREATE TABLE payments (
                        payment_id INT PRIMARY KEY,
                        payment_type VARCHAR(50),
                        fare_amt FLOAT,
                        surcharge FLOAT,
                        tip_amt FLOAT,
                        tolls_amt FLOAT,
                        total_amt FLOAT
                    );

                    CREATE TABLE trips (
                        trip_id INT PRIMARY KEY,
                        trip_pickup_datetime TIMESTAMP,
                        trip_dropoff_datetime TIMESTAMP,
                        passenger_count INT,
                        trip_distance FLOAT,
                        vendor_id INT,
                        payment_id INT,
                        location_id INT,
                        FOREIGN KEY (vendor_id) REFERENCES vendors(vendor_id),
                        FOREIGN KEY (payment_id) REFERENCES payments(payment_id),
                        FOREIGN KEY (location_id) REFERENCES locations(location_id)
                       );

    '''

cur.execute(create_table_query)

conn.commit()
conn.close()



Loading Data Into Postgresql Database Using JDBC Driver

In [45]:
# PostgreSQL JDBC URL and properties
jdbc_url = "jdbc:postgresql://localhost:5432/ny_taxi_trips"
jdbc_properties = {"user": "postgres", "password": password, "driver":"org.postgresql.Driver"}

In [46]:
dataframes_to_save = [
    (vendors, "vendors"),
    (locations, "locations"),
    (payments, "payments"),
    (trips, "trips")
]

In [42]:
# vendors.write.jdbc(url=jdbc_url, table='vendors',mode='overwrite', properties=jdbc_properties)

In [47]:
for dataframe, table_name in dataframes_to_save:
    writer=dataframe.write \
    .format('jdbc') \
    .option ('url', jdbc_url) \
    .option ('dbtable', table_name) \
    
    # Set each property individually
    for key, value in jdbc_properties.items():
        writer_incl_properties = writer.option(key, value)

    writer_incl_properties.mode('append').save()

Loading Data Into Postgresql Database Using SqlAlchemy

In [None]:
from sqlalchemy import create_engine

connection_string = f'postgresql://postgres:postgres@localhost/taxi_data'

engine = create_engine(connection_string)

In [None]:
trips.to_sql('trips', engine, if_exists='replace', index=False)
vendors.to_sql('vendors', engine, if_exists='replace', index=False)
payments.to_sql('payments', engine, if_exists='replace', index=False)
locations.to_sql('locations', engine, if_exists='replace', index=False)

Convert To Get Pyhton Script

In [54]:
# Getting python script
!pip install nbconvert

!jupyter nbconvert --to script ny_data.ipynb

Collecting nbconvert
  Downloading nbconvert-7.16.4-py3-none-any.whl.metadata (8.5 kB)
Collecting beautifulsoup4 (from nbconvert)
  Downloading beautifulsoup4-4.12.3-py3-none-any.whl.metadata (3.8 kB)
Collecting bleach!=5.0.0 (from nbconvert)
  Downloading bleach-6.1.0-py3-none-any.whl.metadata (30 kB)
Collecting defusedxml (from nbconvert)
  Downloading defusedxml-0.7.1-py2.py3-none-any.whl.metadata (32 kB)
Collecting jinja2>=3.0 (from nbconvert)
  Downloading jinja2-3.1.4-py3-none-any.whl.metadata (2.6 kB)
Collecting jupyterlab-pygments (from nbconvert)
  Downloading jupyterlab_pygments-0.3.0-py3-none-any.whl.metadata (4.4 kB)
Collecting markupsafe>=2.0 (from nbconvert)
  Downloading MarkupSafe-2.1.5-cp311-cp311-win_amd64.whl.metadata (3.1 kB)
Collecting mistune<4,>=2.0.3 (from nbconvert)
  Downloading mistune-3.0.2-py3-none-any.whl.metadata (1.7 kB)
Collecting nbclient>=0.5.0 (from nbconvert)
  Downloading nbclient-0.10.0-py3-none-any.whl.metadata (7.8 kB)
Collecting nbformat>=5.7 (

[NbConvertApp] Converting notebook ny_data.ipynb to script
[NbConvertApp] Writing 5502 bytes to ny_data.py
