In [0]:
# Query 2. - Increasing count of total passengers in New York City by area  // total count
# Query 3. - Realtime Average fare/total earning amount earned by 2 vendors
# Query 4. - Moving Count of payments made by each payment mode
# Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab
# Query 6. - Most no of passenger between a route of two location.
# Query 7. - Get top pickup locations with most passengers in last 5/10 seconds.

In [0]:
df = spark.read.parquet("/FileStore/tables/yellow_tripdata_2018_01_1_.parquet")

In [0]:
display(df.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,


In [0]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [0]:
from pyspark.sql.functions import *

In [0]:
q1 = df.withColumn("Revenue",col("fare_amount")+col("extra")+col("mta_tax")+col("improvement_surcharge")+col("tip_amount")+col("tolls_amount")+col("total_amount"))

In [0]:
display(q1.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,date,Revenue
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,2018-01-01,11.6
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,,2018-01-01,30.6
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,,2018-01-01,16.6
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,,2018-01-01,69.6
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,,2018-01-01,33.1


In [0]:
max_revenue = q1.select(max(col('Revenue')))
display(max_revenue)

max(Revenue)
16033.6


In [0]:
# Query 2. - Increasing count of total passengers in New York City by area
q2 = df.groupBy("PULocationID","DOLocationID").agg(count("passenger_count").alias("total_passengers"))
display(q2.limit(5))

PULocationID,DOLocationID,total_passengers
79,116,344
246,249,3109
234,144,4360
161,193,57
231,261,3562


In [0]:
# Query 3. - Realtime Average fare/total earning amount earned by 2 vendors

q3 = df.groupBy("VendorID").agg(round(avg("total_amount"),2),round(avg("fare_amount"),2))
display(q3.limit(5))

VendorID,"round(avg(total_amount), 2)","round(avg(fare_amount), 2)"
1,15.13,11.96
2,15.78,12.47


In [0]:
# Query 4. - Moving Count of payments made by each payment mode

q4 = df.groupBy("payment_type").agg(sum("passenger_count"))
display(q4.limit(5))

payment_type,sum(passenger_count)
1,9751892
3,53801
2,4255992
4,15052


In [0]:
q1.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- date: date (nullable = true)
 |-- Revenue: double (nullable = true)



In [0]:
# Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

q1 = q1.withColumn("date",to_date("tpep_pickup_datetime"))
q5 = q1.groupBy("VendorID","date").agg(sum("passenger_count").alias("total_passenger"),sum("trip_distance").alias("total_distance"),sum("Revenue").alias("total_revenue"))
display(q5.sort(desc("total_revenue")).limit(2))

VendorID,date,total_passenger,total_distance,total_revenue
2,2018-01-25,351475,518394.0400000009,5966751.749995339
2,2018-01-26,354497,513454.3299999982,5927691.89999487


In [0]:
# Query 6. - Most no of passenger between a route of two location.

q6 = df.groupBy("PULocationID","DOLocationID").agg(sum("passenger_count").alias("total_passengers"))
display(q6.sort(desc(col("total_passengers"))).limit(2))

PULocationID,DOLocationID,total_passengers
264,264,186705
237,236,86737


In [0]:
from datetime import timedelta

In [0]:
#Query-7 :Get top pickup locations with most passengers in last 5/10 seconds

last_date = df.agg(max(col("tpep_pickup_datetime"))).collect()
window_10sec = last_date[0][0] - timedelta(seconds = 10)
print(window_10sec)
q7 = df.filter(col('tpep_pickup_datetime') > window_10sec)
q7 = q7.groupBy("PuLocationID").agg(sum("passenger_count").alias("total_passenger"))
ans = q7.sort(col("total_passenger").desc()).limit(1)
display(ans)

2018-07-27 04:06:27


PuLocationID,total_passenger
48,2
