In [41]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

from pyspark.sql.functions import count, col, from_unixtime, lit, year, quarter, month, bround
from pyspark.sql.types import *

In [2]:
conf = SparkConf()
conf.setMaster("local").setAppName("bronze_layer")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

22/10/22 18:45:23 WARN Utils: Your hostname, TOBA_ASUS resolves to a loopback address: 127.0.1.1; using 172.19.216.97 instead (on interface eth0)
22/10/22 18:45:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/22 18:45:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
yellow_schema = StructType([
                    StructField('VendorID', IntegerType(), True),
                    StructField('tpep_pickup_datetime', TimestampType(), True),
                    StructField('tpep_dropoff_datetime', TimestampType(), True),
                    StructField('passenger_count', IntegerType(), True),
                    StructField('trip_distance', DoubleType(), True),
                    StructField('RatecodeID', IntegerType(), True),
                    StructField('store_and_fwd_flag', StringType(), True),
                    StructField('PULocationID', IntegerType(), True),
                    StructField('DOLocationID', IntegerType(), True),
                    StructField('payment_type', IntegerType(), True),
                    StructField('fare_amount', DoubleType(), True),
                    StructField('extra', DoubleType(), True),
                    StructField('mta_tax', DoubleType(), True),
                    StructField('tip_amount', DoubleType(), True),
                    StructField('tolls_amount', DoubleType(), True),
                    StructField('improvement_surcharge', DoubleType(), True),
                    StructField('total_amount', DoubleType(), True),
                    StructField('congestion_surcharge', DoubleType(), True),
                    StructField('airport_fee', DoubleType(), True),
                ])

green_schema = StructType([
                    StructField('VendorID', IntegerType(), True),
                    StructField('lpep_pickup_datetime', TimestampType(), True),
                    StructField('lpep_dropoff_datetime', TimestampType(), True),
                    StructField('store_and_fwd_flag', StringType(), True),
                    StructField('RatecodeID', IntegerType(), True),
                    StructField('PULocationID', IntegerType(), True),
                    StructField('DOLocationID', IntegerType(), True),
                    StructField('passenger_count', IntegerType(), True),
                    StructField('trip_distance', DoubleType(), True),
                    StructField('fare_amount', DoubleType(), True),
                    StructField('extra', DoubleType(), True),
                    StructField('mta_tax', DoubleType(), True),
                    StructField('tip_amount', DoubleType(), True),
                    StructField('tolls_amount', DoubleType(), True),
                    StructField('ehail_fee', DoubleType(), True),
                    StructField('improvement_surcharge', DoubleType(), True),
                    StructField('total_amount', DoubleType(), True),
                    StructField('payment_type', IntegerType(), True),
                    StructField('trip_type', IntegerType(), True),
                    StructField('congestion_surcharge', DoubleType(), True)
                ])

In [5]:
green_df = spark.read.parquet('../../nyc_taxis/data/green*.parquet', schema=green_schema)

                                                                                

In [6]:
yellow_df = spark.read.parquet('../../nyc_taxis/data/yellow*.parquet', schema=yellow_schema)

In [18]:
taxis_df = (yellow_df.select(
        col("VendorID").alias("vendor_id"),
        col("PULocationID").alias("pickup_location_id"),
        col("tpep_pickup_datetime").alias("pickup_ts"),
        col("DOLocationID").alias("dropoff_location_id"),
        col("tpep_dropoff_datetime").alias("dropoff_ts"),
        col("RatecodeID").alias("rate_code_id"),
        "passenger_count",
        "store_and_fwd_flag",
        "trip_distance",
        "total_amount",
        "fare_amount",
        "extra",
        "mta_tax",
        "tip_amount",
        "tolls_amount",
        "improvement_surcharge",
        "congestion_surcharge",
    )
    .withColumn("taxi_type", lit("yellow"))
).union(green_df.select(
        col("VendorID").alias("vendor_id"),
        col("PULocationID").alias("pickup_location_id"),
        col("lpep_pickup_datetime").alias("pickup_ts"),
        col("DOLocationID").alias("dropoff_location_id"),
        col("lpep_dropoff_datetime").alias("dropoff_ts"),
        col("RatecodeID").alias("rate_code_id"),
        "passenger_count",
        "store_and_fwd_flag",
        "trip_distance",
        "total_amount",
        "fare_amount",
        "extra",
        "mta_tax",
        "tip_amount",
        "tolls_amount",
        "improvement_surcharge",
        "congestion_surcharge",
    )
    .withColumn("taxi_type", lit("green"))
)

In [33]:
total_records = taxis_df.count()

In [19]:
taxis_df.show(5)

+---------+------------------+-------------------+-------------------+-------------------+------------+---------------+------------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+--------------------+---------+
|vendor_id|pickup_location_id|          pickup_ts|dropoff_location_id|         dropoff_ts|rate_code_id|passenger_count|store_and_fwd_flag|trip_distance|total_amount|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|congestion_surcharge|taxi_type|
+---------+------------------+-------------------+-------------------+-------------------+------------+---------------+------------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+--------------------+---------+
|        1|               238|2019-12-31 21:28:15|                239|2019-12-31 21:33:03|         1.0|            1.0|                 N|          1.2|       11.27|        6.0|  3.0|    0.5|  

In [49]:
mt_count = taxis_df \
    .withColumn("pickup_yr", year(col("pickup_ts"))) \
    .withColumn("pickup_mt", month(col("pickup_ts"))) \
    .filter("pickup_yr >= 2020") \
    .filter("pickup_yr <= 2021") \
    .groupby("pickup_mt") \
    .count()

qt_count = taxis_df \
    .withColumn("pickup_yr", year(col("pickup_ts"))) \
    .withColumn("pickup_qt", quarter(col("pickup_ts"))) \
    .filter("pickup_yr >= 2020") \
    .filter("pickup_yr <= 2021") \
    .groupby("pickup_qt") \
    .count()

In [46]:
mt_count.withColumn("pct_total", bround(100*col("count")/total_records, 2)).show(24)



+---------+-------+---------+
|pickup_mt|  count|pct_total|
+---------+-------+---------+
|        1|8276749|    14.18|
|        6|3536274|     6.06|
|        3|5210792|     8.93|
|        5|2998074|     5.14|
|        4|2535633|     4.35|
|        7|3785073|     6.49|
|        2|8138797|    13.95|
|       12|4858776|     8.33|
|        9|4491785|      7.7|
|       10|5354484|     9.18|
|       11|5171880|     8.86|
|        8|3952741|     6.77|
+---------+-------+---------+



                                                                                

In [50]:
qt_count.withColumn("pct_total", bround(100*col("count")/total_records, 2)).show(24)



+---------+--------+---------+
|pickup_qt|   count|pct_total|
+---------+--------+---------+
|        1|21626338|    37.06|
|        3|12229599|    20.96|
|        2| 9069981|    15.54|
|        4|15385140|    26.36|
+---------+--------+---------+



                                                                                

In [53]:
taxis_df \
    .withColumn("pickup_yr", year(col("pickup_ts"))) \
    .withColumn("pickup_qt", quarter(col("pickup_ts"))) \
    .filter("pickup_yr >= 2020") \
    .filter("pickup_yr <= 2021") \
    .write.partitionBy("pickup_qt") \
    .saveAsTable("taxis_bronze", mode="overwrite")

                                                                                

In [52]:
taxis_df \
    .withColumn("pickup_yr", year(col("pickup_ts"))) \
    .withColumn("pickup_qt", quarter(col("pickup_ts"))) \
    .filter("pickup_yr >= 2020") \
    .filter("pickup_yr <= 2021") \
    .write.saveAsTable("taxis_brz", format="parquet", mode="overwrite")



22/10/22 19:38:46 WARN BasicWriteTaskStatsTracker: Expected 4 files, but only saw 3. This could be due to the output format not writing empty files, or files being not immediately visible in the filesystem.


                                                                                