In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder \
    .appName('Batch-Processing') \
    .config('spark.jars', 'driver/postgresql-42.6.0.jar') \
    .getOrCreate()

In [2]:
import json
def read_data(spark, table_name: str) -> DataFrame:
    with open('config/config.json', 'r') as config_file:
        config_data = json.load(config_file)
        host = config_data['staging']['host']
        port = config_data['staging']['port']
        database = config_data['staging']['database']
        user = config_data['staging']['user']
        password = config_data['staging']['password']

    url = 'jdbc:postgresql://{0}:{1}/{2}'.format(host, port, database)
    properties = {
        'user': user,
        'password': password,
        'driver': 'org.postgresql.Driver'}
    df = spark.read.jdbc(url=url, table=table_name, properties=properties)
    print(f'Read data from {table_name} successfully!')
    return df

In [3]:
green_taxi = read_data(spark, 'green_taxi')
# green_taxi.show()

Read data from green_taxi successfully!


In [4]:
taxi_zone = read_data(spark, 'taxi_zone_lookup').select(
    ['LocationID', 'Zone', 'Borough'])

Read data from taxi_zone_lookup successfully!


In [5]:
def join_data(left, right, left_on: str, right_on: str, cols_to_lookup: List[str], how='inner') -> DataFrame:
    df = left.join(right, left[left_on] == right[right_on], how)
    if cols_to_lookup:
        df = df.select(cols_to_lookup)
    return df

In [6]:
cols = ['VendorID', 'RatecodeID', 'pickup_datetime', 'pickup_borough', 'pickup_zone', 'dropoff_datetime', 'dropoff_borough', 'dropoff_zone', 'passenger_count',
        'trip_distance_in_km', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type']
join_data_green = join_data(green_taxi, taxi_zone,
                            'PULocationID', 'LocationID', [])
join_data_green = join_data_green.withColumnRenamed(
    'Zone', 'pickup_zone').withColumnRenamed('Borough', 'pickup_borough').drop('LocationID')
join_data_green = join_data(
    join_data_green, taxi_zone, 'DOLocationID', 'LocationID', [])
join_data_green = join_data_green.withColumnRenamed(
    'Zone', 'dropoff_zone').withColumnRenamed('Borough', 'dropoff_borough').drop('LocationID')
join_data_green = join_data_green.select(cols)
join_data_green = join_data_green.withColumn(
    'revenue_month', month('pickup_datetime'))
join_data_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- pickup_zone: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- dropoff_borough: string (nullable = true)
 |-- dropoff_zone: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance_in_km: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- revenue_month: integer (nullable = true)



In [7]:
df = join_data_green.groupBy(['pickup_zone', 'revenue_month']).agg(sum('fare_amount').alias('revenue_monthly_fare'),
                                                                   sum('extra').alias('revenue_monthly_extra'),
                                                                   sum('mta_tax').alias('revenue_monthly_mta_tax'),
                                                                   sum('tip_amount').alias('revenue_monthly_tip_amount'),
                                                                   sum('tolls_amount').alias('revenue_monthly_tolls_amount'),
                                                                   sum('total_amount').alias('revenue_monthly_total_amount'),
                                                                   sum('mta_tax').alias('congestion_surcharge'),
                                                                   count('*').alias('total_monthly_trips'),
                                                                   avg('passenger_count').alias('avg_monthly_passenger_count'),
                                                                   avg('trip_distance_in_km').alias('avg_monthly_trip_distance'))
df.show()

+--------------------+-------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+----------------------------+--------------------+-------------------+---------------------------+-------------------------+
|         pickup_zone|revenue_month|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_total_amount|congestion_surcharge|total_monthly_trips|avg_monthly_passenger_count|avg_monthly_trip_distance|
+--------------------+-------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+----------------------------+--------------------+-------------------+---------------------------+-------------------------+
|       Port Richmond|            2|                16.0|                  0.5|                    0.5|                       2.0|                  

In [10]:
yellow_taxi = read_data(spark, 'yellow_taxi')

Read data from yellow_taxi successfully!


In [12]:
yellow_taxi.printSchema()


root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- trip_time_in_mins: double (nullable = true)
 |-- trip_distance_in_km: double (nullable = true)
 |-- average_velocity: double (nullabl

In [None]:
load(df, schema='production', table_name=name)
