## Overview

This notebook has the first task covered 

Note: This query may not be perfect , i am still learning.

TASK:
Load NYC taxi data to DataLake/Blob_Storage/DataBricks and extract the data through dataframe in the notebook.
Perform Following Queries using PySpark.

Query 1. - Add a column named as "Revenue" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount'

Query 2. - Increasing count of total passengers in New York City by area

Query 3. - Realtime Average fare/total earning amount earned by 2 vendors

Query 4. - Moving Count of payments made by each payment mode

Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

Query 6. - Most no of passenger between a route of two location.

Query 7. - Get top pickup locations with most passengers in last 5/10 seconds.


Note : The file is directly uploaded to DBFS , rather than any integration of blob storeage or data lake

In [0]:
from pyspark.sql.functions import *

In [0]:
# File location and type
file_location = "/FileStore/tables/yellow_tripdata_2018_01__1_.parquet"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)


In [0]:
#Query 1. - Add a column named as "Revenue" into dataframe which is the sum of the below 
from pyspark.sql import functions as F
df = df.withColumn(
    "Revenue", 
     F.col("Extra") + F.col("MTA_tax") + F.col("Improvement_surcharge")+F.col('Tip_amount')+F.col('Tolls_amount')+F.col('Total_amount')
)

In [0]:
df.limit(5).display()

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,Revenue
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,7.1
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,,16.6
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,,10.6
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,,36.1
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,,20.6


In [0]:
#Query 2. - Increasing count of total passengers in New York City by area
df.groupBy("PULocationID",'DOLocationID').sum("passenger_count").show()

+------------+------------+--------------------+
|PULocationID|DOLocationID|sum(passenger_count)|
+------------+------------+--------------------+
|          79|         116|                 580|
|         246|         249|                5073|
|         234|         144|                7047|
|         161|         193|                 100|
|         231|         261|                5812|
|         143|         211|                 260|
|          90|         231|                6447|
|         142|         144|                 971|
|          90|         142|                2835|
|         249|         225|                 400|
|         170|         179|                 432|
|          48|         232|                 783|
|         137|          37|                 197|
|          87|          33|                 853|
|          25|         148|                  91|
|         179|         260|                 128|
|         195|           4|                   4|
|          25|      

In [0]:
#Query 3. - Realtime Average fare/total earning amount earned by 2 vendors
df.groupBy("VendorID").avg("total_amount").show()

+--------+------------------+
|VendorID| avg(total_amount)|
+--------+------------------+
|       1|15.127384289902137|
|       2|15.775723474073514|
+--------+------------------+



In [0]:
#Query 4. - Moving Count of payments made by each payment mode
display(df.groupBy('payment_type'). count())

payment_type,count
1,6106416
3,43204
2,2599215
4,11852


In [0]:
#Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab
df.select("VendorID","tpep_pickup_datetime","passenger_count","trip_distance").sort(df.trip_distance.desc(),df.passenger_count.desc()).limit(2).show()

+--------+--------------------+---------------+-------------+
|VendorID|tpep_pickup_datetime|passenger_count|trip_distance|
+--------+--------------------+---------------+-------------+
|       2| 2018-01-30 11:41:02|              1|    189483.84|
|       1| 2018-01-08 19:44:54|              0|        830.8|
+--------+--------------------+---------------+-------------+



In [0]:
#Query 6. - Most no of passenger between a route of two location.
q6 = (df.groupBy("PULocationID",'DOLocationID').agg(sum("passenger_count").alias("Most_no_of_passenger")))
q6_final = q6.orderBy(col('Most_no_of_passenger').desc())
display(q6_final.limit(1))

PULocationID,DOLocationID,Most_no_of_passenger
264,264,186705


#### Query 7. - Get top pickup locations with most passengers in last 5/10 seconds.
Two approach
1. If it is searched on last time stamp , -10 sec window is created
2. If user inputs a date , I have tried creating a window of +/- 10 sec and then grouped the columns

In [0]:
#on the last time available
date = df.agg(max("tpep_pickup_datetime")).collect()[0][0]
display(df.filter((col("tpep_pickup_datetime")>(date-expr("INTERVAL 10 SECONDS"))))
       .groupby(F.col("PULocationID")).agg(F.sum("passenger_count").alias("count")).sort(F.desc("count")).limit(1))

PULocationID,count
48,2


In [0]:
# with a static user input
date="2018-01-01T00:20:22.000+0000"
display(df.filter((col("tpep_pickup_datetime")<(date+expr("INTERVAL 10 SECONDS"))) & (col("tpep_pickup_datetime")>(date-expr("INTERVAL 10 SECONDS"))))
       .groupby(col("PULocationID")).agg(sum("passenger_count").alias("count")).sort(desc("count")).limit(1))

PULocationID,count
43,19
