In [0]:
import pyspark
from pyspark.sql.functions import *
import pandas
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from pyspark.sql.functions import date_add,date_sub,trunc
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [0]:
#1. Read the tripdetail.json file with the schema having the below mentioned fields
df = spark.read.option("multiline","true").json("/FileStore/tables/tripdetail_json-2.json")
df.show()

+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|DOLocationID|PULocationID|RatecodeID|VendorID|congestion_surcharge|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|
+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|          43|         142|         1|       1|                 2.5|  3.0|        8.0|                  0.3|    0.5|              1|           2|                 N|       0.0|         0.0|        11.8|  2021-01-01 00:36:12|

In [0]:
df.printSchema()

root
 |-- DOLocationID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- VendorID: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- trip_distance: string (nullable = true)



In [0]:
#2 Convert the tpep_pickup_datetime, tpep_dropoff_datetime both columns into IST from PST and add as a seperate column with _IST suffix.
df2 = df.withColumn("tpep_pickup_datetime_IST",df.tpep_pickup_datetime + expr('INTERVAL 12 HOURS 30 minutes'))
df2 = df2.withColumn("tpep_dropoff_datetime_IST",df.tpep_dropoff_datetime + expr('INTERVAL 12 HOURS 30 minutes'))
df2.show()

+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+------------------------+-------------------------+
|DOLocationID|PULocationID|RatecodeID|VendorID|congestion_surcharge|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|tpep_pickup_datetime_IST|tpep_dropoff_datetime_IST|
+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+------------------------+-------------------------+
|          43|         142|         1|       1|                 2.5|  

In [0]:
#3 Add addition column TravelTime by finding the difference between tpep_pickup_datetime and tpep_dropoff_datetime dates
df3 = df.withColumn("TravelTime",(to_timestamp(col("tpep_dropoff_datetime")).cast("long")) - to_timestamp(col("tpep_pickup_datetime")).cast("long"))
df3.show()

+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+----------+
|DOLocationID|PULocationID|RatecodeID|VendorID|congestion_surcharge|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|TravelTime|
+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+----------+
|          43|         142|         1|       1|                 2.5|  3.0|        8.0|                  0.3|    0.5|              1|           2|                 N|       0.0|         0.0|  

In [0]:
#4 Write the dataframe into csv files (as 3 partition)
df.write.format("csv").option("header","true").save("/FileStore/tables/tripdetail_json-2.csv")
df2.write.format("csv").option("header","true").save("/FileStore/tables/tripdetail_json-3.csv")
df3.write.format("csv").option("header","true").save("/FileStore/tables/tripdetail_json-4.csv")


In [0]:
#5 Build a temp view on top of the data frame
df.createOrReplaceTempView("Temp")

In [0]:
spark.sql("select * from Temp").show()

+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|DOLocationID|PULocationID|RatecodeID|VendorID|congestion_surcharge|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|
+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|          43|         142|         1|       1|                 2.5|  3.0|        8.0|                  0.3|    0.5|              1|           2|                 N|       0.0|         0.0|        11.8|  2021-01-01 00:36:12|

In [0]:
#6 Write spark sql query to aggregate fare_amount based on Vendor Id.
df4=spark.sql("select VendorID,sum(fare_amount) from Temp group by VendorID")
df4.show()

+--------+------------------+
|VendorID|  sum(fare_amount)|
+--------+------------------+
|       1|2230.1800000000003|
|       2|            4128.0|
+--------+------------------+



In [0]:
#7 write the output into single CSV file.
df5.write.format("csv").option("header","true").save("/FileStore/tables/tripdetail_json-6.csv")
