In [1]:
### Steps of a sample mini-project on creating a report on green and yellow taxi data
# 1. Reading data from pq folder (both yellow and green taxi data)
# 2. Picking the common columns from both yellow and green taxi data
# 3. Merging the yellow and green taxi data
# 4. Summarizing the yellow and green taxi data with no. of records, sum trip amount

In [1]:
# Below step reads the csv.gz file and converts to parquet format
# This may not be required in the final code 
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]") \
                    .appName('testapp') \
                    .getOrCreate()

23/07/22 13:18:54 WARN Utils: Your hostname, MacBook-Air-2.local resolves to a loopback address: 127.0.0.1; using 192.168.29.110 instead (on interface en0)
23/07/22 13:18:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/22 13:18:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
green_df = spark.read.option("header", "true"). \
                    option("inferschema", "true") \
                    .csv("../../data_files/green/raw/green_tripdata_2019-01.csv.gz")

                                                                                

In [12]:
green_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [13]:
green_pq_location = "../../data_files/green/pq/"
green_df.write.mode('overwrite').parquet(green_pq_location)

                                                                                

In [15]:
!tree "../../data_files/green"

[1;36m../../data_files/green[0m
├── [1;36mpq[0m
│   ├── _SUCCESS
│   └── part-00000-8389e094-fa72-49f5-a4c8-0f5051eb653c-c000.snappy.parquet
└── [1;36mraw[0m
    └── green_tripdata_2019-01.csv.gz

3 directories, 3 files


In [19]:
!ls -lh "../../data_files/green/pq"

total 28800
-rw-r--r--@ 1 sukumarsubudhi  staff     0B Jul 16 20:05 _SUCCESS
-rw-r--r--@ 1 sukumarsubudhi  staff    14M Jul 16 20:05 part-00000-8389e094-fa72-49f5-a4c8-0f5051eb653c-c000.snappy.parquet


In [2]:
colors = ["green", "yellow"]

In [3]:
# The below is a dictionary consisting of the dataframes
df_dict = dict()
for color in colors:
    df_dict[color] = spark.read.option("header", "true") \
                    .option("inferschema", "true") \
                    .format("csv") \
                    .load(f"../../data_files/{color}/raw/*")

                                                                                

In [4]:
from pyspark.sql import functions as f
for color in colors:
    df_dict[color] = df_dict[color].withColumn("servicetype", f.lit(color))

In [5]:
df_dict.keys()

dict_keys(['green', 'yellow'])

In [6]:
df_dict["green"].printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- servicetype: string (nullable = false)



In [6]:
df_dict["yellow"].printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- servicetype: string (nullable = false)



In [10]:
# Renaming the pickup and dropoff columns
df_dict["green"] = df_dict["green"].withColumnsRenamed(
                                    {"lpep_pickup_datetime":"pickup_datetime", 
                                     "lpep_dropoff_datetime" : "dropoff_datetime"}
                                    )
df_dict["yellow"] = df_dict["yellow"].withColumnsRenamed(
                                    {"tpep_pickup_datetime":"pickup_datetime", 
                                     "tpep_dropoff_datetime" : "dropoff_datetime"}
                                    )

In [9]:
len(df_dict["green"].columns)

21

In [10]:
len(df_dict["yellow"].columns)

19

In [7]:
common_columns = [
 'VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'servicetype'
]

In [8]:
print(len(common_columns))

19


In [11]:
df_dict_to_merge = {} #Initializing a dictionary of the merged dataframes
#This below step merges the yellow and green taxi dataframes
for color in ["green", "yellow"]:
    df_dict_to_merge[color] = df_dict[color].select(common_columns)

In [12]:
df_dict_to_merge["yellow"].show(5)

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|servicetype|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1|2019-01-01 00:46:40|2019-01-01 00:53:20|              1|          1.5|         1|                 N|         151|         239|           1|        7.0|  0.5|    0.5|      1.65|         0

In [13]:
# Merging the dataframes
df_final = df_dict_to_merge["green"].unionAll(df_dict_to_merge["yellow"])

In [14]:
df_final.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- servicetype: string (nullable = false)



In [15]:
df_final.select("servicetype").distinct().show()



+-----------+
|servicetype|
+-----------+
|      green|
|     yellow|
+-----------+



                                                                                

In [16]:
df_final.createOrReplaceTempView("df_sql_view")

In [17]:
# Group by servicetype to get the summary of amount columns
df_summary = spark.sql("""

select 
    -- Grouping
    servicetype,
    DATE_TRUNC("month", pickup_datetime) as pickup_month,
    PULocationID as zone,
    
    -- Revenue columns
    sum(total_amount) as revenue_monthly_total_amt,
    sum(tip_amount) as revenue_monthly_tip_amt,
    sum(fare_amount) as revenue_monthly_fare_amt
from df_sql_view
group by 1, 2, 3
""")

In [18]:
df_summary.show()



+-----------+-------------------+----+-------------------------+-----------------------+------------------------+
|servicetype|       pickup_month|zone|revenue_monthly_total_amt|revenue_monthly_tip_amt|revenue_monthly_fare_amt|
+-----------+-------------------+----+-------------------------+-----------------------+------------------------+
|      green|2019-01-01 00:00:00| 116|       120681.19000001065|      7693.839999999978|      102836.90999999996|
|      green|2019-01-01 00:00:00| 213|         47872.0200000005|     151.00000000000003|       44194.31999999997|
|      green|2019-01-01 00:00:00|  34|       2696.8100000000036|     222.73999999999995|                 2251.15|
|      green|2019-01-01 00:00:00| 115|       2680.9100000000003|                  14.56|                 2185.39|
|      green|2019-01-01 00:00:00| 193|        29960.48999999926|     1697.1100000000006|       24610.22000000002|
|      green|2019-01-01 00:00:00| 190|        6770.530000000027|      418.2400000000001|

                                                                                

In [20]:
df_summary.write.parquet("../../data_files/report/report-2019/")

                                                                                

In [21]:
!tree "../../data_files/"

[1;36m../../data_files/[0m
├── [1;36mgreen[0m
│   ├── [1;36mpq[0m
│   │   ├── _SUCCESS
│   │   └── part-00000-8389e094-fa72-49f5-a4c8-0f5051eb653c-c000.snappy.parquet
│   └── [1;36mraw[0m
│       └── green_tripdata_2019-01.csv.gz
├── [1;36mreport[0m
│   └── [1;36mreport-2019[0m
│       ├── _SUCCESS
│       └── part-00000-ecaffd6b-db6b-450b-8a10-f161f2801f95-c000.snappy.parquet
└── [1;36myellow[0m
    ├── [1;36mpq[0m
    │   ├── _SUCCESS
    │   ├── part-00000-c4f735d2-2be5-443d-90a5-b115f2d33d16-c000.snappy.parquet
    │   ├── part-00001-c4f735d2-2be5-443d-90a5-b115f2d33d16-c000.snappy.parquet
    │   ├── part-00002-c4f735d2-2be5-443d-90a5-b115f2d33d16-c000.snappy.parquet
    │   └── part-00003-c4f735d2-2be5-443d-90a5-b115f2d33d16-c000.snappy.parquet
    └── [1;36mraw[0m
        └── yellow_tripdata_2019-01.csv.gz

9 directories, 11 files
