In [0]:
file_location = "/FileStore/tables/yellow_tripdata_2018_01.parquet"
df =spark.read.parquet(file_location,header=True,inferSchema=True)
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2018-01-01 00:21:05|  2018-01-01 00:24:23|              1|          0.5|         1|                 N|          41|          24|           2|        4.5|  0.5|    0.5|       0.

In [0]:
df.schema

Out[2]: StructType(List(StructField(VendorID,LongType,true),StructField(tpep_pickup_datetime,TimestampType,true),StructField(tpep_dropoff_datetime,TimestampType,true),StructField(passenger_count,LongType,true),StructField(trip_distance,DoubleType,true),StructField(RatecodeID,LongType,true),StructField(store_and_fwd_flag,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(payment_type,LongType,true),StructField(fare_amount,DoubleType,true),StructField(extra,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true),StructField(improvement_surcharge,DoubleType,true),StructField(total_amount,DoubleType,true),StructField(congestion_surcharge,DoubleType,true),StructField(airport_fee,DoubleType,true)))

Query 1. - Add a column named as "Revenue" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount'

In [0]:
import pyspark.sql.functions as F
df_new=df.withColumn("revenue",(F.col('Fare_amount')+F.col('Extra')+F.col('MTA_tax')+F.col('Improvement_surcharge')+F.col('Tip_amount')+F.col('Tolls_amount')+F.col('Total_amount')))

In [0]:
df_new.head()

Out[4]: Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 1, 1, 0, 21, 5), tpep_dropoff_datetime=datetime.datetime(2018, 1, 1, 0, 24, 23), passenger_count=1, trip_distance=0.5, RatecodeID=1, store_and_fwd_flag='N', PULocationID=41, DOLocationID=24, payment_type=2, fare_amount=4.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=5.8, congestion_surcharge=None, airport_fee=None, revenue=11.6)

Query 2. - Increasing count of total passengers in New York City by area

In [0]:
df.groupBy("PULocationID").agg(F.sum("passenger_count").alias("total_number"))
            .sort(F.col("total_number")).show()

+------------+------------+
|PULocationID|total_number|
+------------+------------+
|          44|           1|
|         204|           1|
|         184|           2|
|         187|           2|
|         176|           3|
|          46|           3|
|           5|           3|
|          58|           4|
|           2|           4|
|          30|           4|
|         199|           4|
|         245|           5|
|         221|           6|
|         214|           6|
|         105|           8|
|         156|           8|
|          84|           8|
|         109|           8|
|         251|           9|
|         115|          12|
+------------+------------+
only showing top 20 rows



Query 3. - Realtime Average fare/total earning amount earned by 2 vendors

In [0]:
display(df_new.groupBy("VendorID").agg(F.sum("revenue")))

VendorID,sum(revenue)
1,116363894.09691691
2,155030930.7433578


Query 4. - Moving Count of payments made by each payment mode

In [0]:
display(df_new.groupBy("payment_type").count())

payment_type,count
1,6106416
3,43204
2,2599215
4,11852


Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

In [0]:
df_new=df_new.withColumn("date",F.to_date("tpep_pickup_datetime"))

In [0]:
temp_df=df_new.groupBy("VendorID","date")
             .agg(F.sum("revenue").alias("total_revenue"),
             F.sum("passenger_count").alias("total_number"),
             F.sum("trip_distance").alias("total_distance")).
             sort(F.desc("total_revenue"))

In [0]:
date="2018-1-25"
display(temp_df.filter(F.col("date")==date)
        .sort(F.desc("total_revenue"))
        .limit(2))

VendorID,date,total_revenue,total_number,total_distance
2,2018-01-25,5966751.749995339,351475,518394.0400000009
1,2018-01-25,4585064.500002595,172376,383772.2000000028


Query 6. - Most no of passenger between a route of two location.

In [0]:
display(df_new.groupBy("PULocationID","DOLocationID").agg(F.sum("passenger_count").alias("total_count"))
                                                          .sort(F.desc("total_count"))
                                                          .limit(1))

PULocationID,DOLocationID,total_count
264,264,186705


Query 7. - Get top pickup locations with most passengers in last 5/10 seconds

In [0]:
date="2018-01-01 00:21:05"
display(df.filter((F.col("tpep_pickup_datetime")<(F.col("tpep_pickup_datetime")+F.expr("INTERVAL 10 SECONDS")))  & (F.col("tpep_pickup_datetime")>(F.col("tpep_pickup_datetime")-F.expr("INTERVAL 10 SECONDS"))) ).groupby(F.col("PULocationID")).count().sort(F.desc("count")).limit(1))

PULocationID,count
237,361012
