In [101]:

# Imports 
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import time

from pyspark.sql import SQLContext
from pyspark.sql import types 
from pyspark.sql.functions import isnan, when, count, col, round 
from pyspark import SparkContext
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import *
from pyspark.sql.functions import *


%matplotlib inline
plt.style.use('ggplot')

In [6]:
data = pd.read_csv("D:\Freelancing\Exact\yellow_tripdata_2020-01.csv",low_memory=False)

In [7]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [8]:
from pyspark.sql import Row

# Load data into Spark DataFrame
trips_df = spark.read \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .csv("D:\Freelancing\Exact\yellow_tripdata_2020-01.csv")

In [7]:
trips_df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [8]:
trips_df.registerTempTable("trips")


In [9]:
distinctDF = trips_df.distinct()


df_filtered_Total_Fare count: 6232809


In [36]:
spark.read.schema(customSchema).parquet('trip_data_01 copy.parquet')
df_parquet = spark.read.parquet('trip_data_01 copy.parquet')

df_parquet = df_parquet.select(
    col('VendorID').cast(ShortType()),
    col('tpep_pickup_datetime').cast(TimestampType()),
    col('tpep_dropoff_datetime').cast(TimestampType()),
    col('passenger_count').cast(ShortType()),
    col('trip_distance').cast(DecimalType()),
    col('RatecodeID').cast(ShortType()),
    col('store_and_fwd_flag').cast(DecimalType()),
    col('PULocationID').cast(ShortType()),
    col('DOLocationID').cast(ShortType()),
    col('payment_type').cast(ShortType()),
    col('fare_amount').cast(DecimalType()),
    col('extra').cast(DecimalType()),
    col('mta_tax').cast(DecimalType()),
    col('tip_amount').cast(DecimalType()),
    col('tolls_amount').cast(DecimalType()),
    col('improvement_surcharge').cast(DecimalType()),
    col('total_amount').cast(DecimalType()),
    col('congestion_surcharge').cast(DecimalType()),
    
)

df_parquet.printSchema()

root
 |-- VendorID: short (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: short (nullable = true)
 |-- trip_distance: decimal(10,0) (nullable = true)
 |-- RatecodeID: short (nullable = true)
 |-- store_and_fwd_flag: decimal(10,0) (nullable = true)
 |-- PULocationID: short (nullable = true)
 |-- DOLocationID: short (nullable = true)
 |-- payment_type: short (nullable = true)
 |-- fare_amount: decimal(10,0) (nullable = true)
 |-- extra: decimal(10,0) (nullable = true)
 |-- mta_tax: decimal(10,0) (nullable = true)
 |-- tip_amount: decimal(10,0) (nullable = true)
 |-- tolls_amount: decimal(10,0) (nullable = true)
 |-- improvement_surcharge: decimal(10,0) (nullable = true)
 |-- total_amount: decimal(10,0) (nullable = true)
 |-- congestion_surcharge: decimal(10,0) (nullable = true)



In [217]:
df_parquet.createOrReplaceTempView("parquetTable")
spark.sql("SELECT b as Quarter , MAX(d) as DROPOff_Location_ID, CAST(MAX(a)*100 as Numeric(4,2) ) as AVERAGE_TIP_Percentage  from (SELECT DOLocationID as d, QUARTER(tpep_dropoff_datetime) as b ,AVG(tip_amount/total_amount) as a  FROM parquetTable GROUP BY  d, b ORDER BY a DESC) GROUP BY b ORDER BY b ASC ;") .show()
#b is the QUARTER
#d is the Location_ID
#a is the Average Percentage Amount Calculated

+-------+-------------------+----------------------+
|Quarter|DROPOff_Location_ID|AVERAGE_TIP_Percentage|
+-------+-------------------+----------------------+
|      1|                265|                 13.37|
|      2|                246|                 22.73|
|      3|                263|                 22.22|
|      4|                265|                 20.00|
+-------+-------------------+----------------------+



In [73]:
spark.sql("select distinct MONTH(tpep_pickup_datetime) , QUARTER(tpep_pickup_datetime) as a from parquetTable ORDER BY a DESC" ).show()

+-----------------------------------------+---+
|month(CAST(tpep_pickup_datetime AS DATE))|  a|
+-----------------------------------------+---+
|                                       12|  4|
|                                        7|  3|
|                                        5|  2|
|                                        6|  2|
|                                        4|  2|
|                                        1|  1|
|                                        3|  1|
|                                        2|  1|
+-----------------------------------------+---+



In [193]:
#Getting the Difference in Time in SECs and Maximum Distance in Meters and Sorting them Descendingly

spark.sql("SELECT(EXTRACT (DAY FROM (tpep_dropoff_datetime-tpep_pickup_datetime))*24*60*60+EXTRACT (HOUR FROM (tpep_dropoff_datetime-tpep_pickup_datetime))*60*60+ EXTRACT (MINUTE FROM (tpep_dropoff_datetime-tpep_pickup_datetime))*60+EXTRACT (SECOND FROM (tpep_dropoff_datetime-tpep_pickup_datetime)))/60 as diff_minutes, ABS(trip_distance*1000) as distance FROM parquetTable ORDER BY distance DESC").show()

+-------------+---------+
| diff_minutes| distance|
+-------------+---------+
| 65.000000000|210240000|
| 60.000000000| 57051000|
| 60.000000000| 50770000|
| 54.000000000| 45799000|
| 12.000000000| 43700000|
| 29.000000000| 17137000|
|364.766666667|   370000|
|367.066666667|   275000|
|270.600000000|   263000|
|271.833333333|   259000|
|276.433333333|   242000|
|190.316666667|   212000|
|215.266666667|   207000|
|227.016666667|   207000|
|168.283333333|   168000|
|168.816666667|   166000|
|375.233333333|   164000|
|228.800000000|   161000|
|226.933333333|   154000|
|175.016666667|   151000|
+-------------+---------+
only showing top 20 rows



In [118]:
df_parquet.printSchema()

root
 |-- VendorID: short (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: short (nullable = true)
 |-- trip_distance: decimal(10,0) (nullable = true)
 |-- RatecodeID: short (nullable = true)
 |-- store_and_fwd_flag: decimal(10,0) (nullable = true)
 |-- PULocationID: short (nullable = true)
 |-- DOLocationID: short (nullable = true)
 |-- payment_type: short (nullable = true)
 |-- fare_amount: decimal(10,0) (nullable = true)
 |-- extra: decimal(10,0) (nullable = true)
 |-- mta_tax: decimal(10,0) (nullable = true)
 |-- tip_amount: decimal(10,0) (nullable = true)
 |-- tolls_amount: decimal(10,0) (nullable = true)
 |-- improvement_surcharge: decimal(10,0) (nullable = true)
 |-- total_amount: decimal(10,0) (nullable = true)
 |-- congestion_surcharge: decimal(10,0) (nullable = true)



In [210]:
spark.sql("SELECT EXTRACT(HOUR FROM tpep_dropoff_datetime ) as h, trip_distance *1.6 /((EXTRACT (MINUTE FROM (tpep_dropoff_datetime - tpep_pickup_datetime))/60 + EXTRACT (SECOND FROM (tpep_dropoff_datetime - tpep_pickup_datetime))/3600)) as speed FROM parquetTable; ").show(10)

+---+------------------+
|  h|             speed|
+---+------------------+
|  0|20.000000000833335|
|  0|12.943820225184497|
|  0|15.525606468333127|
|  1|19.793814432173452|
|  0|               0.0|
|  0|               0.0|
|  0|               0.0|
| 15|               0.0|
| 15|               0.0|
|  0| 8.384279475982533|
+---+------------------+
only showing top 10 rows



In [218]:
spark.sql("SELECT h , CAST(AVG(speed) as NUMERIC(5,2)) as AVERAGE_SPEED from(SELECT EXTRACT(HOUR FROM tpep_dropoff_datetime ) as h, trip_distance *1.6 /((EXTRACT (MINUTE FROM (tpep_dropoff_datetime - tpep_pickup_datetime))/60 + EXTRACT (SECOND FROM (tpep_dropoff_datetime - tpep_pickup_datetime))/3600)) as speed FROM parquetTable ) GROUP BY h ORDER by AVERAGE_SPEED DESC ").show(24)

+---+-------------+
|  h|AVERAGE_SPEED|
+---+-------------+
|  5|        34.66|
| 16|        31.44|
|  4|        31.40|
|  8|        30.93|
| 17|        30.21|
|  6|        29.18|
|  0|        28.94|
|  3|        28.29|
| 15|        27.84|
|  1|        27.54|
| 18|        26.24|
|  2|        26.19|
|  7|        26.10|
| 23|        25.79|
| 19|        24.55|
| 22|        24.44|
| 21|        23.77|
| 20|        23.52|
|  9|        23.28|
| 14|        22.31|
| 10|        21.31|
| 13|        21.20|
| 11|        20.51|
| 12|        20.33|
+---+-------------+



In [219]:
spark.sql("SELECT h , CAST(AVG(speed) as NUMERIC(5,2)) as AVERAGE_SPEED from(SELECT EXTRACT(HOUR FROM tpep_dropoff_datetime ) as h, trip_distance *1.6 /(( (unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime))/3600 )) as speed FROM parquetTable ) GROUP BY h ORDER by AVERAGE_SPEED DESC ").show(24)

+---+-------------+
|  h|AVERAGE_SPEED|
+---+-------------+
|  5|        34.22|
|  4|        31.09|
|  6|        28.70|
|  0|        27.64|
|  3|        27.59|
|  1|        26.93|
|  2|        25.62|
| 23|        24.97|
| 22|        23.90|
|  7|        23.04|
| 21|        23.00|
| 20|        21.88|
|  8|        19.60|
| 13|        19.38|
| 19|        19.28|
| 16|        19.08|
| 10|        18.85|
| 12|        18.82|
| 17|        18.81|
| 15|        18.75|
| 14|        18.71|
| 11|        18.64|
|  9|        18.49|
| 18|        17.79|
+---+-------------+



In [None]:
spark.sql("create temp view new_table1 as SELECT DOLocationID as DROPOff_Location_ID, QUARTER(tpep_dropoff_datetime) as Quarter ,AVG(tip_amount/total_amount)as AVERAGE_TIP_Percentage FROM parquetTable GROUP BY   Quarter,DROPOff_Location_ID order by AVERAGE_TIP_Percentage DESC ").show()

In [None]:
spark.sql("create temp view new_table4 as select max(AVERAGE_TIP_Percentage) as a ,Quarter from new_table1 GROUP BY new_table1.Quarter").show()

In [None]:
#modified 1st Query

spark.sql("SELECT new_table4.a , new_table4.Quarter , new_table1.DROPOff_Location_ID from new_table4 , new_table1 where new_table4.Quarter = new_table1.Quarter and new_table4.a = new_table1.AVERAGE_TIP_Percentage").show()