# Get the Data

Run `download_data.sh` in a Git MINGW64 terminal to get all the data files

# Start the Spark Session

In [10]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

# create the SparkSession locally with as many CPUs cores as possible
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

**Open the Spark UI at http://localhost:4040/**

# Load the Data

In [22]:
# load in Jan. 2020 green taxi data CSV into a pandas DataFrame
df_green_pd = pd.read_csv('./data/raw/green/2020/01/green_tripdata_2020_01.csv.gz', nrows=1000, compression='gzip')

df_green_pd.head(5)

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-12-18 15:52:30,2019-12-18 15:54:39,N,1,264,264,5,0.0,3.5,0.5,0.5,0.01,0.0,,0.3,4.81,1,1,0.0
1,2,2020-01-01 00:45:58,2020-01-01 00:56:39,N,5,66,65,2,1.28,20.0,0.0,0.0,4.06,0.0,,0.3,24.36,1,2,0.0
2,2,2020-01-01 00:41:38,2020-01-01 00:52:49,N,1,181,228,1,2.47,10.5,0.5,0.5,3.54,0.0,,0.3,15.34,1,1,0.0
3,1,2020-01-01 00:52:46,2020-01-01 01:14:21,N,1,129,263,2,6.3,21.0,3.25,0.5,0.0,0.0,,0.3,25.05,2,1,2.75
4,1,2020-01-01 00:19:57,2020-01-01 00:30:56,N,1,210,150,1,2.3,10.0,0.5,0.5,0.0,0.0,,0.3,11.3,1,1,0.0


In [25]:
df_green_pd.dtypes

VendorID                   int64
lpep_pickup_datetime      object
lpep_dropoff_datetime     object
store_and_fwd_flag        object
RatecodeID                 int64
PULocationID               int64
DOLocationID               int64
passenger_count            int64
trip_distance            float64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
ehail_fee                float64
improvement_surcharge    float64
total_amount             float64
payment_type               int64
trip_type                  int64
congestion_surcharge     float64
dtype: object

# Convert to Spark

In [26]:
# Turn pandas DataFrame into a Spark DataFrame
spark.createDataFrame(df_green_pd).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', LongType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', LongType(), True), StructField('trip_type', LongType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [28]:
from pyspark.sql import types

# Define the green taxi schema
green_schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True),
    types.StructField('lpep_pickup_datetime', types.TimestampType(), True),
    types.StructField('lpep_dropoff_datetime', types.TimestampType(), True),
    types.StructField('store_and_fwd_flag', types.StringType(), True),
    types.StructField('RatecodeID', types.IntegerType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('passenger_count', types.IntegerType(), True),
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('fare_amount', types.DoubleType(), True),
    types.StructField('extra', types.DoubleType(), True),
    types.StructField('mta_tax', types.DoubleType(), True),
    types.StructField('tip_amount', types.DoubleType(), True),
    types.StructField('tolls_amount', types.DoubleType(), True),
    types.StructField('ehail_fee', types.DoubleType(), True),
    types.StructField('improvement_surcharge', types.DoubleType(), True),
    types.StructField('total_amount', types.DoubleType(), True),
    types.StructField('payment_type', types.IntegerType(), True),
    types.StructField('trip_type', types.IntegerType(), True),
    types.StructField('congestion_surcharge', types.DoubleType(), True)
])

In [29]:
df_green_spark = spark.read \
                .option('header', 'true') \
                .schema(green_schema) \
                .csv('./data/raw/green/2020/01')

In [31]:
df_green_spark.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [32]:
# Do the same for yellow taxi data

# Define the yellow taxi schema
yellow_schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True),
    types.StructField('tpep_pickup_datetime', types.TimestampType(), True),
    types.StructField('tpep_dropoff_datetime', types.TimestampType(), True),
    types.StructField('passenger_count', types.IntegerType(), True),
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('RatecodeID', types.IntegerType(), True),
    types.StructField('store_and_fwd_flag', types.StringType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('payment_type', types.IntegerType(), True),
    types.StructField('fare_amount', types.DoubleType(), True),
    types.StructField('extra', types.DoubleType(), True),
    types.StructField('mta_tax', types.DoubleType(), True),
    types.StructField('tip_amount', types.DoubleType(), True),
    types.StructField('tolls_amount', types.DoubleType(), True),
    types.StructField('improvement_surcharge', types.DoubleType(), True),
    types.StructField('total_amount', types.DoubleType(), True),
    types.StructField('congestion_surcharge', types.DoubleType(), True)
])

In [33]:
df_yellow_spark = spark.read \
                .option('header', 'true') \
                .schema(yellow_schema) \
                .csv('./data/raw/yellow/2020/01')

In [34]:
df_yellow_spark.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



# Write to Parquet Files

**We do this so that the Parquet files store the schema that we have written above**

In [37]:
# Repartition the data file into 4 partitions into Parquet files
year = 2021

for month in range(1, 13):
    
    print(f'Processing data for {year}/{month:02d}')
    
    input_path = f'./data/raw/green/{year}/{month:02d}/'
    output_path = f'./data/pq/green/{year}/{month:02d}/'
    
    df_green_spark = spark.read \
                    .option('header', 'true') \
                    .schema(green_schema) \
                    .csv(input_path)    
    
    df_green_spark.repartition(4).write.parquet(output_path)

Processing data for 2021/01
Processing data for 2021/02
Processing data for 2021/03
Processing data for 2021/04
Processing data for 2021/05
Processing data for 2021/06
Processing data for 2021/07
Processing data for 2021/08


AnalysisException: Path does not exist: file:/C:/Users/nimz/Documents/de_zoomcamp/week5_batch_processing/data/raw/green/2021/08

In [38]:
# Repartition the data file into 4 partitions into Parquet files
year = 2020

for month in range(1, 13):
    
    print(f'Processing data for {year}/{month:02d}')
    
    input_path = f'./data/raw/green/{year}/{month:02d}/'
    output_path = f'./data/pq/green/{year}/{month:02d}/'
    
    df_green_spark = spark.read \
                    .option('header', 'true') \
                    .schema(green_schema) \
                    .csv(input_path)    
    
    df_green_spark.repartition(4).write.parquet(output_path)

Processing data for 2020/01
Processing data for 2020/02
Processing data for 2020/03
Processing data for 2020/04
Processing data for 2020/05
Processing data for 2020/06
Processing data for 2020/07
Processing data for 2020/08
Processing data for 2020/09
Processing data for 2020/10
Processing data for 2020/11
Processing data for 2020/12


In [39]:
# Repartition the data file into 4 partitions into Parquet files
year = 2021

for month in range(1, 13):
    
    print(f'Processing data for {year}/{month:02d}')
    
    input_path = f'./data/raw/yellow/{year}/{month:02d}/'
    output_path = f'./data/pq/yellow/{year}/{month:02d}/'
    
    df_green_spark = spark.read \
                    .option('header', 'true') \
                    .schema(green_schema) \
                    .csv(input_path)    
    
    df_green_spark.repartition(4).write.parquet(output_path)

Processing data for 2021/01
Processing data for 2021/02
Processing data for 2021/03
Processing data for 2021/04
Processing data for 2021/05
Processing data for 2021/06
Processing data for 2021/07
Processing data for 2021/08


AnalysisException: Path does not exist: file:/C:/Users/nimz/Documents/de_zoomcamp/week5_batch_processing/data/raw/yellow/2021/08

In [40]:
# Repartition the data file into 4 partitions into Parquet files
year = 2020

for month in range(1, 13):
    
    print(f'Processing data for {year}/{month:02d}')
    
    input_path = f'./data/raw/yellow/{year}/{month:02d}/'
    output_path = f'./data/pq/yellow/{year}/{month:02d}/'
    
    df_green_spark = spark.read \
                    .option('header', 'true') \
                    .schema(green_schema) \
                    .csv(input_path)    
    
    df_green_spark.repartition(4).write.parquet(output_path)

Processing data for 2020/01
Processing data for 2020/02
Processing data for 2020/03
Processing data for 2020/04
Processing data for 2020/05
Processing data for 2020/06
Processing data for 2020/07
Processing data for 2020/08
Processing data for 2020/09
Processing data for 2020/10
Processing data for 2020/11
Processing data for 2020/12


# Use the Python Script

Instead of doing all of the above, run `download_data.sh` in a Git MINGW64 terminal to get all the data files, and then in a `zoom` Conda environment, run `create_taxi_schema.py` to get all the data into the right format

In [41]:
!python create_taxi_schema.py

24/02/27 20:53:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/02/27 20:53:54 WARN ZlibFactory: Failed to load/initialize native-zlib library
24/02/27 20:53:59 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
Creating the SparkSession...
Creating the schemas...
Processing data for yellow/2020/1
Reading data/raw/yellow/2020/01/
Partitioning and saving data/raw/yellow/2020/01/ to data/parquet/yellow/2020/01/
Processing data for yellow/2020/2
Reading data/raw/yellow/2020/02/
Partitioning and saving data/raw/yellow/2020/02/ to data/parquet/yellow/2020/02/
Processing data for yellow/2020/3
Reading data/raw/yellow/2020/03/
Partitioning and saving data/raw/yellow/2020/03/ to data/parquet/yellow/2020/03/
Processing data for yellow/2020/4
Reading data/raw/yellow/2020/04/
Partitioning and saving data/raw/yellow/2020/04/ to data/parquet/yellow/2020/04/
Processing data for yellow/2020

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

[Stage 0:>                                                          (0 + 1) / 1]

[Stage 2:>                                                          (0 + 4) / 4]

                                                                                

[Stage 3:>                                                          (0 + 1) / 1]

[Stage 5:>                                                          (0 + 4) / 4]



                                                                                

[Stage 6:>                                                          (0 + 1) / 1]

[Stage 8:>                                                          (0 + 4) / 4]



                                                                                

[Stage 9:>                                                          (0 + 1) / 1]

                                             

In [42]:
# Load in all green data into a Spark DataFrame
df_green = spark.read.parquet('./data/parquet/green/*/*')

# Rename some columns via PySpark function to match up with yellow (will also be renamed)
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_green.head()

Row(VendorID=2, pickup_datetime=datetime.datetime(2020, 1, 22, 13, 18, 32), dropoff_datetime=datetime.datetime(2020, 1, 22, 13, 45, 58), store_and_fwd_flag='N', RatecodeID=1, PULocationID=244, DOLocationID=41, passenger_count=1, trip_distance=5.22, fare_amount=22.0, extra=0.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=22.8, payment_type=1, trip_type=1, congestion_surcharge=0.0)

In [43]:
# Load in all yellow data into a Spark DataFrame
df_yellow = spark.read.parquet('./data/parquet/yellow/*/*')

# Rename some columns via PySpark function to match up with the green taxi data above
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

df_yellow.head()

Row(VendorID=1, pickup_datetime=datetime.datetime(2020, 1, 1, 19, 45, 57), dropoff_datetime=datetime.datetime(2020, 1, 1, 20, 6, 13), passenger_count=1, trip_distance=6.1, RatecodeID=1, store_and_fwd_flag='N', PULocationID=162, DOLocationID=87, payment_type=1, fare_amount=21.0, extra=2.5, mta_tax=0.5, tip_amount=4.85, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=29.15, congestion_surcharge=2.5)

#### We want to eventually combine these two datasets into one large dataset

In [44]:
print(df_green.count())
print(df_yellow.count())

2304517
39649199


In [45]:
# get the set of unique columns from the yellow data
yellow_columns = set(df_yellow.columns)
print(yellow_columns)

{'PULocationID', 'fare_amount', 'total_amount', 'VendorID', 'RatecodeID', 'DOLocationID', 'tip_amount', 'tolls_amount', 'store_and_fwd_flag', 'trip_distance', 'dropoff_datetime', 'payment_type', 'mta_tax', 'congestion_surcharge', 'pickup_datetime', 'improvement_surcharge', 'extra', 'passenger_count'}


In [46]:
# Create a list to hold all common columns between the two datasets
common_columns = []

# Get all columns from the yellow dataset that are also in the green dataset
#     and preserve order of cols
for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)
        
print(common_columns)

['VendorID', 'pickup_datetime', 'dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'payment_type', 'congestion_surcharge']


In [47]:
# Import the built-in PySpark functions
from pyspark.sql import functions as F

**`.withColumn()` is a Transformation function of a Spark DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more**
- https://sparkbyexamples.com/pyspark/pyspark-withcolumn/

**`pyspark.sql.functions.lit` is used to add a new column to DataFrame by assigning a literal or constant value**
- https://sparkbyexamples.com/pyspark/pyspark-lit-add-literal-constant/

In [48]:
# Get all columns from green that are a part of the common columns
#   and then add a new service_type column with value of 'green'
df_green_sel = df_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))

# Get all columns from yellow that are a part of the common columns
#   and then add a new service_type column with value of 'yellow'
df_yellow_sel = df_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))

In [49]:
# Union the datasets together
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [50]:
# Inspect the data
df_trips_data.groupBy('service_type').count().show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [51]:
# View the columns
df_trips_data.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'service_type']

# Spark SQL

**In order to use Spark SQL, we have to tell Spark that our DataFrame is a table**

In [52]:
# Tell Spark that our DataFrame is a table
# df_trips_data.registerTempTable('trips_data')  # deprecated
df_trips_data.createOrReplaceTempView('trips_data')

In [54]:
# Do some Spark SQL on the data
spark.sql("""
    SELECT
        service_type,
        count(1) AS count
    FROM
        trips_data
    GROUP BY 
        service_type
""").show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [55]:
# Save a SQL result to a NEW DataFrame, similar to our dbt models

# Get all revenue for each pickup zone/revenue location, for each service type, for each month
df_result = spark.sql("""
    SELECT 
        -- Reveneue grouping 
        PULocationID AS revenue_zone,
        date_trunc('month', pickup_datetime) AS revenue_month, 
        service_type, 

        -- Revenue calculation 
        SUM(fare_amount) AS revenue_monthly_fare,
        SUM(extra) AS revenue_monthly_extra,
        SUM(mta_tax) AS revenue_monthly_mta_tax,
        SUM(tip_amount) AS revenue_monthly_tip_amount,
        SUM(tolls_amount) AS revenue_monthly_tolls_amount,
        SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
        SUM(total_amount) AS revenue_monthly_total_amount,
        SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

        -- Additional calculations
        AVG(passenger_count) AS avg_montly_passenger_count,
        AVG(trip_distance) AS avg_montly_trip_distance
    FROM
        trips_data
    GROUP BY
        1, 2, 3
""")

In [57]:
df_result.show(5)

+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|         218|2020-

In [58]:
df_result.head(5)

[Row(revenue_zone=218, revenue_month=datetime.datetime(2020, 1, 1, 0, 0), service_type='green', revenue_monthly_fare=24689.350000000126, revenue_monthly_extra=1561.75, revenue_monthly_mta_tax=121.5, revenue_monthly_tip_amount=18.7, revenue_monthly_tolls_amount=735.9000000000004, revenue_monthly_improvement_surcharge=246.0000000000036, revenue_monthly_total_amount=27375.149999999958, revenue_monthly_congestion_surcharge=0.0, avg_montly_passenger_count=1.0754716981132075, avg_montly_trip_distance=6.732052451539335),
 Row(revenue_zone=85, revenue_month=datetime.datetime(2020, 1, 1, 0, 0), service_type='green', revenue_monthly_fare=19874.88000000006, revenue_monthly_extra=1476.5, revenue_monthly_mta_tax=187.5, revenue_monthly_tip_amount=165.89999999999998, revenue_monthly_tolls_amount=354.6800000000002, revenue_monthly_improvement_surcharge=233.70000000000314, revenue_monthly_total_amount=22320.9099999999, revenue_monthly_congestion_surcharge=11.0, avg_montly_passenger_count=1.334975369458

**`.coalesce(*cols)` returns the first column that is not null and is *used to decrease the number of partitions in an efficient way***

**Can go to the Spark UI after running the next cell to see jobs/tasks running**

In [59]:
# use just 1 partition via .coalesce()
df_result.coalesce(1) \
    .write.parquet('data/report/revenue/', mode='overwrite')

In [60]:
!dir data\report\revenue\

 Volume in drive C has no label.
 Volume Serial Number is 08A3-CF2D

 Directory of C:\Users\nimz\Documents\de_zoomcamp\week5_batch_processing\data\report\revenue

02/27/2024  09:11 PM    <DIR>          .
02/27/2024  09:11 PM    <DIR>          ..
02/27/2024  09:11 PM             4,112 .part-00000-596e3dca-5394-4d49-9d44-c8d07b1cacb2-c000.snappy.parquet.crc
02/27/2024  09:11 PM                 8 ._SUCCESS.crc
02/27/2024  09:11 PM           524,936 part-00000-596e3dca-5394-4d49-9d44-c8d07b1cacb2-c000.snappy.parquet
02/27/2024  09:11 PM                 0 _SUCCESS
               4 File(s)        529,056 bytes
               2 Dir(s)  240,145,195,008 bytes free
