In [7]:
!pip install --no-cache-dir geopandas
import geopandas



In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, DecimalType, IntegerType, StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, month, year, isnan, desc, unix_timestamp, to_timestamp, dayofmonth, hour, minute, second, mean
from functools import reduce
import seaborn as sns
import pandas as pd
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt

In [9]:
# Setting up a theme with Seaborn
sns.set_theme(style="whitegrid")

# Adjusting font scale for better readability
sns.set_context("notebook", font_scale=1.2)

In [10]:
spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
	.config("spark.executor.memory", "1g") \
    .config('spark.executor.instances', 6) \
	.appName("Yellow Taxi") \
	.getOrCreate()

In [11]:
base_path = '../yellow_taxi_data/yellow_tripdata_2023-{:02d}.parquet'

paths=[]
for mo in range(1, 7):  # This loops from 1 to 12
    path = base_path.format(mo)  # Formats the month with leading zero if necessary
    paths.append(path)

In [12]:
def load_and_cast(filepath):
    df = spark.read.parquet(filepath)
    df = df.withColumn("VendorID", col("VendorID").cast(IntegerType()))
    return df

# Load, cast, and accumulate all DataFrames
dataframes = [load_and_cast(path) for path in paths]
df = reduce(lambda df1, df2: df1.unionByName(df2), dataframes)

# Show the DataFrame
df.show(1)

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

In [13]:
from shapely.geometry import Point

In [15]:
zone = gpd.read_file("yellow_taxi_data/yellow_taxi_zones/taxi_zones.shp")

zone.set_crs("EPSG:2263", inplace=True)
zone['center'] = zone.representative_point()
center_gdf = gpd.GeoDataFrame(zone, geometry=zone['center'])
center_gdf = center_gdf.to_crs("EPSG:4326")

zone['long'] = center_gdf.geometry.x
zone['lat'] = center_gdf.geometry.y

zone = zone.drop(columns = ['OBJECTID','geometry','center'])

In [16]:
schemazone = StructType([
    StructField("Shape_Leng", DoubleType(), True),
    StructField("Shape_Area", DoubleType(), True),
    StructField("Zone", StringType(), True),
    StructField("LocationID", StringType(), True),
    StructField("Borough", StringType(), True),
    StructField("Long", DoubleType(), True),
    StructField("Lat", DoubleType(), True)

])

zonedf = spark.createDataFrame(zone, schemazone)

In [17]:
merged_zone = df.join(zonedf, df.PULocationID == zonedf.LocationID, how = 'left')

In [18]:
merged_zone = merged_zone.drop(*('VendorID','passenger_count','RatecodeID','store_and_fwd_flag','payment_type', 'fare_amount',
                                'extra','mta_tax','tip_amount','tolls_amount','improvement_surcharge','total_amount','congestion_surcharge',
                                'airport_fee', 'LocationID'))

In [19]:
merged_zone = merged_zone.withColumnRenamed("Shape_Leng", "PU_Shape_Leng") \
                         .withColumnRenamed("Shape_Area", "PU_Shape_Area") \
                         .withColumnRenamed("Zone", "PU_Zone") \
                         .withColumnRenamed("Borough", "PU_Borough") \
                         .withColumnRenamed("long", "PU_Long") \
                         .withColumnRenamed("lat", "PU_Lat")

In [20]:
merged_zone = merged_zone.join(zonedf, merged_zone.DOLocationID == zonedf.LocationID, how = 'left')

In [21]:
merged_zone = merged_zone.withColumnRenamed("Shape_Leng", "DO_Shape_Leng") \
                         .withColumnRenamed("Shape_Area", "DO_Shape_Area") \
                         .withColumnRenamed("Zone", "DO_Zone") \
                         .withColumnRenamed("Borough", "DO_Borough") \
                         .withColumnRenamed("long", "DO_Long") \
                         .withColumnRenamed("lat", "DO_Lat")

In [22]:
from pyspark.sql.functions import col, unix_timestamp, expr

merged_zone = merged_zone.withColumn('tpep_pickup_datetime', col('tpep_pickup_datetime').cast('timestamp'))
merged_zone = merged_zone.withColumn('tpep_dropoff_datetime', col('tpep_dropoff_datetime').cast('timestamp'))
merged_zone = merged_zone.withColumn('time_diff_seconds', unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime'))

merged_zone = merged_zone.withColumn('duration_mins', (col('time_diff_seconds') / 60).cast('int'))
merged_zone = merged_zone.drop('time_diff_seconds')

In [23]:
merged_zone = merged_zone.drop('LocationID')

In [24]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, dayofweek

merged_zone = merged_zone.withColumn('month', month('tpep_pickup_datetime')) \
                         .withColumn('day', dayofmonth('tpep_pickup_datetime')) \
                         .withColumn('hour', hour('tpep_pickup_datetime')) \
                         .withColumn('minute', minute('tpep_pickup_datetime'))\
                         .withColumn('dayOfWeek', dayofweek('tpep_pickup_datetime'))

In [25]:
merged_zone = merged_zone.withColumn('is_weekend', (merged_zone.dayOfWeek.isin([1, 7])).cast('boolean'))


In [26]:
from pyspark.sql.functions import col, sum

na_counts = merged_zone.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_zone.columns])

In [27]:
na_counts.show()



CodeCache: size=131072Kb used=30983Kb max_used=30998Kb free=100088Kb
 bounds [0x00000001069e0000, 0x0000000108870000, 0x000000010e9e0000]
 total_blobs=11020 nmethods=10020 adapters=909
 compilation: disabled (not enough contiguous free space left)




+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+-------------+-------------+-------+----------+-------+------+-------------+-----+---+----+------+---------+----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|PULocationID|DOLocationID|PU_Shape_Leng|PU_Shape_Area|PU_Zone|PU_Borough|PU_Long|PU_Lat|DO_Shape_Leng|DO_Shape_Area|DO_Zone|DO_Borough|DO_Long|DO_Lat|duration_mins|month|day|hour|minute|dayOfWeek|is_weekend|
+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+-------------+-------------+-------+----------+-------+------+-------------+-----+---+----+------+---------+----------+
|                   0|                    0|            0|           0|           0|       223909|       223909| 223909|    223909| 223909|223909|       278399|       278399| 278399|    278399| 278399|2

                                                                                

In [28]:
merged_zone.count()

                                                                                

19500314

In [29]:
merged_zone.filter(col("PU_Shape_Leng").isNull()).show(5)



+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+---------------+----------------+--------------------+----------+------------------+------------------+-------------+-----+---+----+------+---------+----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|PULocationID|DOLocationID|PU_Shape_Leng|PU_Shape_Area|PU_Zone|PU_Borough|PU_Long|PU_Lat|  DO_Shape_Leng|   DO_Shape_Area|             DO_Zone|DO_Borough|           DO_Long|            DO_Lat|duration_mins|month|day|hour|minute|dayOfWeek|is_weekend|
+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+---------------+----------------+--------------------+----------+------------------+------------------+-------------+-----+---+----+------+---------+----------+
| 2023-01-01 00:10:50|  2023-01-01 00:20:19|         1.41|         264|        

                                                                                

In [30]:
merged_zone = merged_zone.na.fill({'PU_Zone': 'OutsideNYC', 'DO_Zone':'OutsideNYC', 'PU_Borough': 'OutsideNYC', \
                     'DO_Borough':'OutsideNYC'})

In [31]:
# numeric_cols = [field.name for field in merged_zone.schema.fields if isinstance(field.dataType, (DoubleType, IntegerType))]

numeric_cols = ['PU_Long', 'PU_Lat',  'PU_Shape_Leng', 'PU_Shape_Area', 'DO_Long', 'DO_Lat',  'DO_Shape_Leng', 'DO_Shape_Area']
for col_name in numeric_cols:
    mean_value = merged_zone.select(mean(col(col_name))).collect()[0][0]
    merged_zone = merged_zone.fillna({col_name: mean_value})

                                                                                

In [33]:
gpd.read_file("yellow_taxi_data/yellow_taxi_zones/taxi_zones.shp")

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,geometry
0,1,0.116357,0.000782,Newark Airport,1,EWR,"POLYGON ((933100.918 192536.086, 933091.011 19..."
1,2,0.433470,0.004866,Jamaica Bay,2,Queens,"MULTIPOLYGON (((1033269.244 172126.008, 103343..."
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,"POLYGON ((1026308.770 256767.698, 1026495.593 ..."
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,"POLYGON ((992073.467 203714.076, 992068.667 20..."
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,"POLYGON ((935843.310 144283.336, 936046.565 14..."
...,...,...,...,...,...,...,...
258,259,0.126750,0.000395,Woodlawn/Wakefield,259,Bronx,"POLYGON ((1025414.782 270986.139, 1025138.624 ..."
259,260,0.133514,0.000422,Woodside,260,Queens,"POLYGON ((1011466.966 216463.005, 1011545.889 ..."
260,261,0.027120,0.000034,World Trade Center,261,Manhattan,"POLYGON ((980555.204 196138.486, 980570.792 19..."
261,262,0.049064,0.000122,Yorkville East,262,Manhattan,"MULTIPOLYGON (((999804.795 224498.527, 999824...."


In [34]:
na_counts = merged_zone.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_zone.columns])

In [35]:
na_counts.show()



+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+-------------+-------------+-------+----------+-------+------+-------------+-----+---+----+------+---------+----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|PULocationID|DOLocationID|PU_Shape_Leng|PU_Shape_Area|PU_Zone|PU_Borough|PU_Long|PU_Lat|DO_Shape_Leng|DO_Shape_Area|DO_Zone|DO_Borough|DO_Long|DO_Lat|duration_mins|month|day|hour|minute|dayOfWeek|is_weekend|
+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+-------------+-------------+-------+----------+-------+------+-------------+-----+---+----+------+---------+----------+
|                   0|                    0|            0|           0|           0|            0|            0|      0|         0|      0|     0|            0|            0|      0|         0|      0| 

                                                                                

In [36]:
train_df, test_df = merged_zone.randomSplit(weights=[0.8,0.2], seed=11)

In [37]:
train_df.dtypes

[('tpep_pickup_datetime', 'timestamp'),
 ('tpep_dropoff_datetime', 'timestamp'),
 ('trip_distance', 'double'),
 ('PULocationID', 'bigint'),
 ('DOLocationID', 'bigint'),
 ('PU_Shape_Leng', 'double'),
 ('PU_Shape_Area', 'double'),
 ('PU_Zone', 'string'),
 ('PU_Borough', 'string'),
 ('PU_Long', 'double'),
 ('PU_Lat', 'double'),
 ('DO_Shape_Leng', 'double'),
 ('DO_Shape_Area', 'double'),
 ('DO_Zone', 'string'),
 ('DO_Borough', 'string'),
 ('DO_Long', 'double'),
 ('DO_Lat', 'double'),
 ('duration_mins', 'int'),
 ('month', 'int'),
 ('day', 'int'),
 ('hour', 'int'),
 ('minute', 'int'),
 ('dayOfWeek', 'int'),
 ('is_weekend', 'boolean')]

In [38]:
train_df = train_df.drop('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PU_Borough', 'PU_Zone', 'DO_Borough', 'DO_Zone', 'is_weekend', 'PULocationID', 'DOLocationID', 'DO_Shape_Leng', 'DO_Shape_Area', 'PU_Shape_Leng', 'PU_Shape_Area')

In [39]:
label_name = "duration_mins"

# get a list with feature column names
feature_names = [x.name for x in train_df.schema if x.name != label_name]

In [40]:
feature_names

['trip_distance',
 'PU_Long',
 'PU_Lat',
 'DO_Long',
 'DO_Lat',
 'month',
 'day',
 'hour',
 'minute',
 'dayOfWeek']

In [41]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler = VectorAssembler(inputCols=feature_names, outputCol='features')
df_transformed = assembler.transform(train_df)


In [42]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

# Fit the StandardScaler to compute the mean and standard deviation to be used for later scaling
scalerModel = scaler.fit(df_transformed)

# Normalize each feature to have unit standard deviation and/or zero mean
df_normalized = scalerModel.transform(df_transformed)


                                                                                

In [43]:
# Define the model
lr = LinearRegression(featuresCol='scaledFeatures', labelCol=label_name, regParam=0.3,)

In [44]:
model = lr.fit(df_normalized)


24/05/19 11:41:59 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/19 11:42:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [45]:
trainingSummary = model.summary
print("Coefficient Standard Errors: ", trainingSummary.coefficientStandardErrors)
print("T Values: ", trainingSummary.tValues)
print("P Values: ", trainingSummary.pValues)
print("Loss: ", trainingSummary.objectiveHistory)  # Objective history during iterations

Coefficient Standard Errors:  [0.01058059611416176, 0.012622571779277234, 0.012890578601249847, 0.011064940165697817, 0.01134266726224986, 0.010583025194088767, 0.010582839734927671, 0.010600233987063671, 0.01058069369620194, 0.010605127526641238, 0.010617062496592432]
T Values:  [13.280151654832144, 357.34990838541074, -149.6023420073847, 89.32320249800797, -169.8949504582612, 69.0505771744367, 7.309362445934758, 3.221921807392167, -12.803202881732362, 31.070986841140467, 1552.9703110149499]
P Values:  [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.6845192735436285e-13, 0.0012733414101981921, 0.0, 0.0, 0.0]
Loss:  [0.0]


In [46]:
from pyspark.ml.evaluation import RegressionEvaluator

test_df_transformed = assembler.transform(test_df)
test_df_normalized = scalerModel.transform(test_df_transformed)
predictions = model.transform(test_df_normalized)

evaluator = RegressionEvaluator(
    labelCol=label_name,
    predictionCol="prediction",
    metricName="rmse"  # You can change this to "mse", "mae", or "r2"
)

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)



Root Mean Squared Error (RMSE) on test data = 42.4341


                                                                                

In [47]:
predictions.show(10)

24/05/19 11:49:17 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 176:>                                                        (0 + 1) / 1]

+--------------------+---------------------+-------------+------------+------------+---------------+----------------+--------------------+----------+------------------+------------------+------------------+--------------------+--------------------+----------+------------------+------------------+-------------+-----+---+----+------+---------+----------+--------------------+--------------------+------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|PULocationID|DOLocationID|  PU_Shape_Leng|   PU_Shape_Area|             PU_Zone|PU_Borough|           PU_Long|            PU_Lat|     DO_Shape_Leng|       DO_Shape_Area|             DO_Zone|DO_Borough|           DO_Long|            DO_Lat|duration_mins|month|day|hour|minute|dayOfWeek|is_weekend|            features|      scaledFeatures|        prediction|
+--------------------+---------------------+-------------+------------+------------+---------------+----------------+--------------------+----------+------------------+

                                                                                

In [48]:
feature_names

['trip_distance',
 'PU_Long',
 'PU_Lat',
 'DO_Long',
 'DO_Lat',
 'month',
 'day',
 'hour',
 'minute',
 'dayOfWeek']

In [49]:
predictions.drop('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PU_Borough', 'PU_Zone', 'DO_Borough', 'DO_Zone', 'is_weekend', 'PULocationID', 'DOLocationID', 'DO_Shape_Leng', 'DO_Shape_Area', 'PU_Shape_Leng', 'PU_Shape_Area').show(20)

[Stage 182:>                                                        (0 + 1) / 1]

+-------------+------------------+------------------+------------------+------------------+-------------+-----+---+----+------+---------+--------------------+--------------------+------------------+
|trip_distance|           PU_Long|            PU_Lat|           DO_Long|            DO_Lat|duration_mins|month|day|hour|minute|dayOfWeek|            features|      scaledFeatures|        prediction|
+-------------+------------------+------------------+------------------+------------------+-------------+-----+---+----+------+---------+--------------------+--------------------+------------------+
|          0.0|-74.00137472995894| 40.72389907146951|-74.00137472995894| 40.72389907146951|            0|   10| 24|  23|    15|        2|[0.0,-74.00137472...|[-0.0165635958487...| 18.48098479601192|
|         0.97|-73.77826365889905|40.642604521716564|-73.77826365889905|40.642604521716564|            2|   10| 25|   0|    42|        3|[0.97,-73.7782636...|[-0.0126218055638...|53.125366589100764|
|    

                                                                                

In [50]:
from xgboost.spark import SparkXGBRegressor

In [51]:
xgboost_regressor = SparkXGBRegressor(features_col="scaledFeatures", label_col=label_name)


In [52]:
xgb_regression_model = xgboost_regressor.fit(df_normalized)

INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 1 workers with====> (47 + 1) / 48]
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[11:54:36] task 0 got new rank 0
INFO:XGBoost-PySpark:Finished xgboost training!                                 


In [53]:
xgbr_predictions = xgb_regression_model.transform(test_df_normalized)

In [54]:
xgb_rmse = evaluator.evaluate(xgbr_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % xgb_rmse)

2024-05-19 11:55:58,849 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-05-19 11:55:58,891 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-05-19 11:55:58,893 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-05-19 11:55:58,899 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-05-19 11:55:58,903 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-05-19 11:55:58,918 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-05-19 11:55:58,931 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-05-19 11:55:58,940 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs

Root Mean Squared Error (RMSE) on test data = 41.2483


                                                                                