In [1]:
from pyspark.sql import SparkSession
import pyspark.sql
from pyspark.sql.functions import col
import warnings
warnings.filterwarnings("ignore")
from pyspark.sql.types import *
import datetime

# Create a spark session (which will run spark jobs)
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
spark.sparkContext.setLogLevel("OFF")

### Reading in the Data
taxi_type = "yellow"
data_dir = 'data/' + taxi_type + '*'
sdf = spark.read.csv(data_dir, header=True)
f"{sdf.count():,} rows!"

def create_schema(data_dir):
    ints = ['VendorID', 'passenger_count', 'RateCodeID', 'RatecodeID','payment_type', 'PULocationID', 'DOLocationID']
    doubles = ['trip_distance','fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']
    strings = ['store_and_fwd_flag']
    dtimes = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']
    
    if taxi_type == "green":
        doubles.append("ehail_fee")
        ints.append("trip_type")
        dtimes = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

    dtypes = {column: IntegerType() for column in ints}
    dtypes.update({column: DoubleType() for column in doubles})
    dtypes.update({column: StringType() for column in strings})
    dtypes.update({column: TimestampType() for column in dtimes})

    schema = StructType()

    for column in sdf.columns:
        schema.add(column, # column name
                   dtypes[column], # data type
                   True # is nullable?
                  )
    return schema

schema = create_schema(data_dir)
sdf = spark.read.csv(data_dir, header=True, schema=schema) \
    .withColumnRenamed("RatecodeID","RateCodeID") # rename the wrong column
# Renaming lpep columns to tpep, for consistency so that it worths with later cuntions
sdf = sdf.withColumnRenamed("lpep_pickup_datetime","tpep_pickup_datetime")
sdf = sdf.withColumnRenamed("lpep_dropoff_datetime","tpep_dropoff_datetime")

21/08/08 21:45:23 WARN Utils: Your hostname, ra141 resolves to a loopback address: 127.0.1.1; using 192.168.1.115 instead (on interface enp0s31f6)
21/08/08 21:45:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/08 21:45:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [2]:
### Data cleaning
# Removing entries not actually in 2020
sdf = sdf.filter((sdf.tpep_pickup_datetime >= datetime.datetime(2020,1,1,0,0,0)) & \
    (sdf.tpep_pickup_datetime < datetime.datetime(2021,1,1,0,0,0)))  
# Finding the min time, to verify it is infact the first day of 2020
time = sdf.select(pyspark.sql.functions.min("tpep_pickup_datetime")).collect()[0][0]
print(f"The first entry for a cab is at {time}")



The first entry for a cab is at 2020-01-01 00:00:00


                                                                                

In [None]:
### Getting the proportions of cars that are hailed, vs ordered, vs NA
# Only applicable to green cab
nums = list(sdf.groupby("trip_type").count().collect()) # Getting counts of trip types
nums = list(map(lambda x: list(x)[1], nums))    # Flattening
nums = list(map(lambda x: x/sum(nums), nums))   # Converting to percentage of total
print(f"NULL: {nums[0]*100:.2f}%, Street Hail: {nums[1]*100:.2f}%, Dispath: {nums[2]*100:.2f}%")

In [None]:
### Get the average number of pickups in a 10 minute interval in a given PU location ID
print(f"There are {sdf.count()} trips in total and {sdf.select('PULocationID').distinct().count()} unique PULocationIDs.")
num = sdf.count()/sdf.select("PULocationID").distinct().count()/365/24/6
print(f"In a 10 minute interval in a given location ID there are on average {num:.2f} trips.")

In [None]:
# Getting the number of location IDs
num_locations = max([sdf.select(pyspark.sql.functions.max("PULocationID")).collect()[0][0], sdf.select(pyspark.sql.functions.max("DOLocationID")).collect()[0][0]])
temp_sdf_date = datetime.datetime(2020,1,1)
curr_pickup_time = datetime.datetime(2020,1,1,0,0,0)

def get_pickups(time_interval_in_seconds):
    global curr_pickup_time
    num_pickups = list([0]*num_locations)
    
    sdf_within_interval = temp_sdf.filter((temp_sdf.tpep_pickup_datetime >= curr_pickup_time) & \
        (temp_sdf.tpep_pickup_datetime < \
        curr_pickup_time + datetime.timedelta(seconds = time_interval_in_seconds)))
    counts = sdf_within_interval.groupby("PULocationID").count().collect()
    for count in counts:
        num_pickups[count[0]-1] = count[1]
    
    curr_pickup_time += datetime.timedelta(seconds = time_interval_in_seconds)
    if curr_pickup_time > temp_sdf_date:
        populate_temp_sdf()
    return num_pickups

# Creates a df with the next days events, returns a bool denoting success
def populate_temp_sdf():
    global temp_sdf_date
    
    if temp_sdf_date == datetime.datetime(2021, 1, 1):
        return None
    
    temp_sdf = sdf.filter((sdf.tpep_pickup_datetime >= temp_sdf_date) & \
        (sdf.tpep_pickup_datetime < temp_sdf_date + datetime.timedelta(days=1)))
    temp_sdf.cache()
    
    temp_sdf_date += datetime.timedelta(days=1)
    
    return temp_sdf

temp_sdf = populate_temp_sdf()

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

import matplotlib.animation as animation
fig = plt.figure()
#creating a subplot 
ax1 = fig.add_subplot(1,1,1)
def animate(i):
    ax1.clear()
    
    plt.title(f'Pick Up Density over Time - {curr_pickup_time}')
    plt.xlabel('Location IDs')
    plt.ylabel('Number of Pickups')
    plt.ylim(top=100)
    
    locations = list(range(1, num_locations+1))
    num_pickups = get_pickups(60*10)
    ax1.bar(locations, num_pickups)
    
ani = animation.FuncAnimation(fig, animate, interval=250, frames=6*23) 
ani.save('sine_wave.gif', writer='imagemagick')

In [None]:
### A small test of regression in spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
tiny_sdf = spark.read.csv("tmp.csv", header = True)
schema = StructType()
ints = ('MATH', 'CS', 'Score')
dtypes = {column: IntegerType() for column in ints}
schema = StructType()

for column in tiny_sdf.columns:
    schema.add(column, # column name
               dtypes[column], # data type
               True # is nullable?
              )

tiny_sdf = spark.read.csv("tmp.csv", header = True, schema=schema)

vectorAssembler = VectorAssembler(inputCols = ['MATH', 'CS'], outputCol = 'features')
tiny_sdf = vectorAssembler.transform(tiny_sdf)
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol='Score', featuresCol = 'features')
model = lr.fit(tiny_sdf)
tiny_sdf.show()
for i in range(4):
    model.predict(tiny_sdf.head(4)[i].features)

In [4]:
### Reducing sdf into a structure containing average pickups for each location for each time for each time interval
def transform(x):
    time = x.tpep_pickup_datetime
    time_interval = 20
    time_interval_index = ((time.hour*3600) + (time.minute*60) + (time.second))//time_interval
    return time.weekday(), time_interval_index, x.PULocationID

from os import path
if path.isdir(f"Intervaled_{taxi_type}"):
    ### Reading an existing sdf2
    sdf2 = spark.read.csv(f"Intervaled_{taxi_type}", header=True)
    ints = ["day", "time_interval", "PULocationID"]
    dtypes = {column: IntegerType() for column in ints}
    schema = StructType()
    [schema.add(column, dtypes[column], True) for column in sdf2.columns]
    sdf2 = spark.read.csv(f"Intervaled_{taxi_type}", header=True, schema = schema)
else:
    rdd2 = sdf.rdd.map(transform)
    sdf2 = rdd2.toDF(["day","time_interval","PULocationID"])
    sdf2.write.csv(f"Intervaled_{taxi_type}", header = True)
    sdf2.persist(pyspark.StorageLevel.DISK_ONLY)

sdf2.show(5)

+---+-------------+------------+
|day|time_interval|PULocationID|
+---+-------------+------------+
|  1|           21|         138|
|  1|          123|         140|
|  1|          101|         132|
|  1|            6|         238|
|  1|          113|         238|
+---+-------------+------------+
only showing top 5 rows



In [13]:
### Create a smaller version of sdf2 and try the aggregations
sdf3 = sdf2.groupby(["day", "time_interval", "PULocationID"]).count().sort(col("count").desc())
sdf3.show(4)
sdf3.persist(pyspark.StorageLevel.DISK_ONLY)

+---+-------------+------------+-----+
|day|time_interval|PULocationID|count|
+---+-------------+------------+-----+
|  3|         2728|         236|  129|
|  2|         2674|         237|  127|
|  3|         2670|         237|  123|
|  1|         2687|         237|  123|
+---+-------------+------------+-----+
only showing top 4 rows



DataFrame[day: int, time_interval: int, PULocationID: int, count: bigint]

In [32]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder
from pyspark.ml.regression import LinearRegression

sdf4 = OneHotEncoder(inputCols = ["day","time_interval","PULocationID"], outputCols = ["day_vec","time_interval_vec","PULocationID_vec"]).fit(sdf3).transform(sdf3)
sdf4.show(5)
vectorAssembler = VectorAssembler(inputCols = ["day_vec","time_interval_vec","PULocationID_vec"], outputCol = 'features')
sdf4 = vectorAssembler.transform(sdf4)
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol='count', featuresCol = 'features')

splits = sdf4.randomSplit([0.7, 0.3])
train_sdf4 = splits[0]
test_sdf4 = splits[1]
                 
lr_model = lr.fit(train_sdf4)

+---+-------------+------------+-----+-------------+-------------------+-----------------+
|day|time_interval|PULocationID|count|      day_vec|  time_interval_vec| PULocationID_vec|
+---+-------------+------------+-----+-------------+-------------------+-----------------+
|  3|         2728|         236|  129|(6,[3],[1.0])|(4319,[2728],[1.0])|(265,[236],[1.0])|
|  2|         2674|         237|  127|(6,[2],[1.0])|(4319,[2674],[1.0])|(265,[237],[1.0])|
|  3|         2670|         237|  123|(6,[3],[1.0])|(4319,[2670],[1.0])|(265,[237],[1.0])|
|  1|         2687|         237|  123|(6,[1],[1.0])|(4319,[2687],[1.0])|(265,[237],[1.0])|
|  2|         2756|         237|  122|(6,[2],[1.0])|(4319,[2756],[1.0])|(265,[237],[1.0])|
+---+-------------+------------+-----+-------------+-------------------+-----------------+
only showing top 5 rows



                                                                                

In [33]:
trainingSummary = model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
lr_model.evaluate(train_sdf4)

from pyspark.ml.evaluation import RegressionEvaluator
lr_predictions = lr_model.transform(test_sdf4)
lr_predictions.select("prediction","count","features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="count",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

RMSE: 9.895082
r2: 0.483622


                                                                                

+-----------------+-----+--------------------+
|       prediction|count|            features|
+-----------------+-----+--------------------+
|26.23592760189126|   79|(4590,[0,1189,451...|
|26.23592760189126|   74|(4590,[0,1211,451...|
|26.23592760189126|   74|(4590,[0,1218,451...|
|26.23592760189126|   77|(4590,[0,1222,451...|
|26.23592760189126|   77|(4590,[0,1224,451...|
+-----------------+-----+--------------------+
only showing top 5 rows





R Squared (R2) on test data = 0.483943


                                                                                