In [1]:
import numpy as np
import pandas as pd

<h1>Data Exploration and Visualization</h1>
<h2>Load Data</h2>

In [2]:
yellow_19 = spark.read.load('source/2019_Yellow_Taxi_Trip_Data.csv',format='csv', sep=',', inferSchema='true', header='true')
yellow_20 = spark.read.load('source/2020_Yellow_Taxi_Trip_Data.csv',format='csv', sep=',', inferSchema='true', header='true')
yellow_21 = spark.read.load('source/2021_Yellow_Taxi_Trip_Data.csv',format='csv', sep=',', inferSchema='true', header='true')

In [3]:
yellow_19.head()

Row(VendorID='2', tpep_pickup_datetime='04/21/2019 11:22:07 PM', tpep_dropoff_datetime='04/21/2019 11:28:15 PM', passenger_count=1, trip_distance=0.92, RatecodeID=1, store_and_fwd_flag='N', PULocationID=142, DOLocationID=161, payment_type=2, fare_amount=6.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=9.8, congestion_surcharge=2.5)

In [4]:
yellow_20.head()

Row(VendorID=1, tpep_pickup_datetime='01/01/2020 12:28:15 AM', tpep_dropoff_datetime='01/01/2020 12:33:03 AM', passenger_count=1, trip_distance=1.2, RatecodeID=1, store_and_fwd_flag='N', PULocationID=238, DOLocationID=239, payment_type=1, fare_amount=6.0, extra=3.0, mta_tax=0.5, tip_amount=1.47, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=11.27, congestion_surcharge=2.5)

In [5]:
yellow_21.head()

Row(VendorID=1, tpep_pickup_datetime='01/01/2021 12:30:10 AM', tpep_dropoff_datetime='01/01/2021 12:36:12 AM', passenger_count=1, trip_distance=2.1, RatecodeID=1, store_and_fwd_flag='N', PULocationID=142, DOLocationID=43, payment_type=2, fare_amount=8.0, extra=3.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=11.8, congestion_surcharge=2.5)

In [6]:
import requests
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, to_timestamp, year, month, dayofmonth, to_date


def define_schema():
    labels = [('VendorID', StringType()),
              ('tpep_pickup_datetime', StringType()),
              ('tpep_dropoff_datetime', StringType()),
              ('passenger_count', IntegerType()),
              ('trip_distance', DoubleType()),
              ('RatecodeID', IntegerType()),
              ('store_and_fwd_flag', StringType()),
              ('PULocationID', IntegerType()),
              ('DOLocationID', IntegerType()),
              ('payment_type', IntegerType()),
              ('fare_amount', DoubleType()),
              ('extra', DoubleType()),
              ('mta_tax', DoubleType()),
              ('tip_amount', DoubleType()),
              ('tolls_amount', DoubleType()),
              ('improvement_surcharge', DoubleType()),
              ('total_amount', DoubleType()),
              ('congestion_surcharge', DoubleType())]
    return StructType([StructField(x[0], x[1], True) for x in labels])

In [7]:
def foreach_batch_function(df, epoch_id):
    request_data = {}
    if 'total_tips' in df.columns:
        labels = [str(t.year) for t in df.select("year").collect()]
        data = [p.total_tips for p in df.select("total_tips").collect()]
        request_data = {'labels0': str(labels), 'values0': str(data)}
    elif 'total_taxi_trip' in df.columns:
        labels = [str(t.date) for t in df.select("date").collect()]
        data = [p.total_taxi_trip for p in df.select("total_taxi_trip").collect()]
        request_data = {'labels1': str(labels), 'values1': str(data)}
    elif 'total_trip_miles' in df.columns:
        labels = [str(t.year) for t in df.select("year").collect()]
        data = [p.total_trip_miles for p in df.select("total_trip_miles").collect()]
        request_data = {'labels3': str(labels), 'values3': str(data)}
    elif 'total_trip_hours' in df.columns:
        labels = [str(t.year) for t in df.select("year").collect()]
        data = [p.total_trip_hours for p in df.select("total_trip_hours").collect()]
        request_data = {'labels4': str(labels), 'values4': str(data)}
    elif 'total_rides' in df.columns:
        df_temp1 = df.groupBy('year').max("total_rides").select("max(total_rides)")\
            .withColumnRenamed("max(total_rides)", "total_rides_count")
        df_temp2 = df.join(df_temp1, df.total_rides == df_temp1.total_rides_count, "inner")\
            .select(concat(col("year"), lit(": "), col("company")).alias("name"), "total_rides")
        labels = [str(t.name) for t in df_temp2.select("name").collect()]
        data = [p.total_rides for p in df_temp2.select("total_rides").collect()]
        request_data = {'labels5': str(labels), 'values5': str(data)}
    url = 'http://ilab1.cs.rutgers.edu:5001/updateData'
    print(request_data)
    requests.post(url, data=request_data)

In [8]:
schema = define_schema()
dataframe = spark.readStream.format("csv").schema(schema)\
        .option("header", True).option("cleanSource", "off")\
        .option("ignoreLeadingWhiteSpace", True).option("mode", "dropMalformed")\
        .option("maxFilesPerTrigger", 1).load("source/")  

In [9]:
dataframe = dataframe.toDF(*[c.lower().replace(" ", "_") for c in dataframe.columns]) \
        .withColumn('tpep_pickup_datetime', to_timestamp(col('tpep_pickup_datetime'), 'MM/dd/yyyy hh:mm:ss a')) \
        .withColumn('tpep_dropoff_datetime', to_timestamp(col('tpep_dropoff_datetime'), 'MM/dd/yyyy hh:mm:ss a')) \
        .withColumn('date', to_date(col('tpep_pickup_datetime')))\
        .withColumn('year', year(col('tpep_pickup_datetime')))\
        .withColumn('month', month(col('tpep_pickup_datetime')))\
        .withColumn('day', dayofmonth(col('tpep_pickup_datetime')))

In [10]:
# Create a view
dataframe.createOrReplaceTempView("taxi_trips")

In [11]:
# 1. Rate of tipping over years?
spark.sql("""SELECT year, SUM(tip_amount) AS total_tips FROM taxi_trips GROUP BY year""")\
        .writeStream.outputMode("complete").foreachBatch(foreach_batch_function).start()


<pyspark.sql.streaming.StreamingQuery at 0x7f0abc520a60>

In [12]:
# 2. Total trips taken on each day in all the years
spark.sql("""SELECT date, COUNT(VendorID) AS total_taxi_trip FROM taxi_trips GROUP BY date""")\
    .writeStream.outputMode("complete").foreachBatch(foreach_batch_function).start()


<pyspark.sql.streaming.StreamingQuery at 0x7f0abc15b6a0>

In [13]:
# 3. Total miles taxis travelled across years?
spark.sql("""SELECT year, SUM(trip_distance) AS total_trip_miles FROM taxi_trips GROUP BY year""")\
    .writeStream.outputMode("complete").foreachBatch(foreach_batch_function).start()


<pyspark.sql.streaming.StreamingQuery at 0x7f0abc2049a0>

In [14]:
# 4. Total amount spent on tips across years?
spark.sql("""SELECT year, SUM(tip_amount) AS total_trip_hours FROM taxi_trips GROUP BY year""")\
    .writeStream.outputMode("complete").foreachBatch(foreach_batch_function).start()


<pyspark.sql.streaming.StreamingQuery at 0x7f0abc204940>

In [None]:
spark.streams.awaitAnyTermination()

{'labels4': "['None', '2019', '2020', '2033', '2009', '2001', '2008', '2002']", 'values4': '[None, 21503423.369999297, 1.95, 0.0, 35.269999999999996, 0.0, 31.090000000000003, 0.0]'}
{'labels3': "['None', '2019', '2020', '2033', '2009', '2001', '2008', '2002']", 'values3': '[None, 28946933.219999973, 1.03, 2.57, 90.53999999999999, 3.22, 61.57, 0.0]'}
{'labels0': "['None', '2019', '2020', '2033', '2009', '2001', '2008', '2002']", 'values0': '[None, 21503423.369999297, 1.95, 0.0, 35.269999999999996, 0.0, 31.090000000000003, 0.0]'}
{'labels1': "['2019-06-04', '2019-05-08', '2019-11-21', '2019-05-27', '2009-01-01', '2019-04-28', '2019-05-14', '2019-05-10', '2019-07-11', '2019-06-03', '2019-05-13', '2019-04-26', '2019-05-20', '2019-05-12', '2019-05-04', '2019-05-25', '2019-04-29', '2019-05-22', 'None', '2019-05-02', '2019-05-21', '2019-05-28', '2019-05-11', '2019-04-21', '2019-05-19', '2019-04-22', '2019-05-24', '2019-07-16', '2008-12-31', '2008-08-08', '2020-01-03', '2019-06-20', '2019-04-2