In [6]:
from pyspark.sql import SparkSession
import pandas as pd
import preprocessing as prep
import extraction as ext
import yaml
import importlib
from pyspark.ml.feature import Normalizer 
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler, VectorIndexer
import pyspark.sql.functions as func
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
importlib.reload(prep)
# Get all the datasets and apply preprocessing 

with open("config.yml", "r") as ymlfile:
    cfg = yaml.safe_load(ymlfile)

spark = SparkSession.builder.appName("NYC_Taxi")\
    .config('spark.executor.memory', "4g")\
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.enabled","true")        

print(f"Get Taxi Data")
psdf_taxi = ext.get_taxi_data(spark, cfg)

print(f"Get zones")
df_zones = ext.get_zones(cfg)
print(f"Clean zones")
df_zones = prep.clean_zone_data(df_zones)
psdf_zones=spark.createDataFrame(df_zones)

gdf_zones_shp = ext.get_zones_shapefile(cfg)
gdf_zones_shp = prep.clean_zone_shp_data(gdf_zones_shp)
gdf_zones_shp = prep.add_features_shp_data(gdf_zones_shp)
psdf_zones_shp = spark.createDataFrame(gdf_zones_shp[["OBJECTID", "lat", "lon"]])

print(f"Add features taxi")
psdf_taxi = prep.add_features_taxi_data(psdf_taxi, psdf_zones, psdf_zones_shp)

print(f"Clean Taxi Data")
psdf_taxi = prep.clean_taxi_data(psdf_taxi, cfg)

print(f"SPDF Taxi size: {psdf_taxi.count()}")



Get Taxi Data
Already downloaded file: yellow_tripdata_2017-01.parquet
Already downloaded file: yellow_tripdata_2017-03.parquet
Already downloaded file: yellow_tripdata_2017-06.parquet
Already downloaded file: yellow_tripdata_2017-11.parquet
Already downloaded file: yellow_tripdata_2017-12.parquet
Get zones
Already downloaded file: taxi+_zone_lookup.csv
Clean zones
Already downloaded file: taxi_zones.shp
Add features taxi
Clean Taxi Data
SPDF Taxi size: 3266249


In [3]:
psdf_taxi_sample = psdf_taxi.sample(0.1)
# psdf_taxi_sample = psdf_taxi_sample.na.fill(value=0)
# we had to remove Null values or Spark produced some errors
psdf_taxi_sample = psdf_taxi_sample.na.drop("any")
# psdf_taxi_sample.na.drop(subset=["tip_amount"])
psdf_taxi_sample.cache()

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: bigint, trip_distance: double, RateCodeID: bigint, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, sum_amount: double, pu_month: int, pu_year: int, do_month: int, do_year: int, duration_in_min: double, tip_percentage: double, day_of_week: int, day_of_month: int, hour: int, PU_Borough: string, PU_Zone: string, PU_service_zone: string, DO_Borough: string, DO_Zone: string, DO_service_zone: string, PU_lat: double, PU_lon: double, DO_lat: double, DO_lon: double]

In [4]:
# include these category columns as one hot encoding
numeric_columns = ['trip_distance', 
    'fare_amount', 'passenger_count',
    'extra', 'tolls_amount',     
    'day_of_month',    
    'PU_lat', 'PU_lon', 
    'DO_lat', 'DO_lon', 
    'duration_in_min'
    ]

category_columns = ["VendorID", 
    "RateCodeID", 
    "store_and_fwd_flag", 
    "pu_month", 
    "day_of_week", 
    "hour", 
    "PU_Borough", 
    # "PU_Zone", 
    "PU_service_zone", 
    "DO_Borough", 
    # "DO_Zone", 
    "DO_service_zone"]  

target_column = "tip_percentage" #"tip_percentage" #"tip_amount"

psdf_taxi_sample = psdf_taxi_sample.select(
    [target_column] + numeric_columns + category_columns)

psdf_taxi_sample.printSchema()

root
 |-- tip_percentage: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- extra: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- PU_lat: double (nullable = true)
 |-- PU_lon: double (nullable = true)
 |-- DO_lat: double (nullable = true)
 |-- DO_lon: double (nullable = true)
 |-- duration_in_min: double (nullable = true)
 |-- VendorID: long (nullable = true)
 |-- RateCodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pu_month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- PU_Borough: string (nullable = true)
 |-- PU_service_zone: string (nullable = true)
 |-- DO_Borough: string (nullable = true)
 |-- DO_service_zone: string (nullable = true)



In [5]:
train, test = psdf_taxi_sample.randomSplit([0.7, 0.3], seed = 0)
print("There are %d training examples and %d test examples." % (train.count(), test.count()))

There are 219575 training examples and 93600 test examples.


In [18]:
featuresCols = psdf_taxi_sample.columns
featuresCols.remove(target_column)
for i in category_columns:
    featuresCols.remove(i)
featuresCols = featuresCols + [column+"_index" for column in category_columns]
 
indexer = StringIndexer(inputCols=category_columns, 
    outputCols=[column+"_index" for column in category_columns], handleInvalid='keep')    

# psdf_taxi_vec = indexer.fit(psdf_taxi_sample).transform(psdf_taxi_sample)

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
 
# vectorIndexer identifies categorical features and indexes them, and creates a new column "features". 
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=8)

In [19]:
gbt = GBTRegressor(labelCol=target_column)

In [20]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
 
# Define a grid of hyperparameters to test:
#  - maxDepth: maximum depth of each decision tree 
#  - maxIter: iterations, or the total number of trees 
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2])\
  .addGrid(gbt.maxIter, [10])\
  .build()
 
# .addGrid(gbt.maxDepth, [2, 5])\
# .addGrid(gbt.maxIter, [10, 100])\
 
# Define an evaluation metric.  The CrossValidator compares the true labels with predicted values for each combination of parameters, and calculates this value to determine the best model.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
 
# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [21]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, vectorAssembler, vectorIndexer, cv])

In [22]:
pipelineModel = pipeline.fit(train)

Py4JJavaError: An error occurred while calling o1148.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 440.0 failed 1 times, most recent failure: Lost task 15.0 in stage 440.0 (TID 11809) (host.docker.internal executor driver): java.io.FileNotFoundException: C:\Users\joan.borras\AppData\Local\Temp\blockmgr-a538f18b-26be-47a1-85de-d0f2f672fc9b\1f\shuffle_86_11809_0.data.e936d4c2-c556-4e42-ad1b-5815780d2a5f (El sistema no puede encontrar la ruta especificada)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.initStream(LocalDiskShuffleMapOutputWriter.java:144)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.access$200(LocalDiskShuffleMapOutputWriter.java:45)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter$LocalDiskShufflePartitionWriter.openStream(LocalDiskShuffleMapOutputWriter.java:177)
	at org.apache.spark.shuffle.ShufflePartitionPairsWriter.open(ShufflePartitionPairsWriter.scala:68)
	at org.apache.spark.shuffle.ShufflePartitionPairsWriter.write(ShufflePartitionPairsWriter.scala:59)
	at org.apache.spark.util.collection.WritablePartitionedIterator.writeNext(WritablePartitionedPairCollection.scala:83)
	at org.apache.spark.util.collection.ExternalSorter.$anonfun$writePartitionedMapOutput$1(ExternalSorter.scala:767)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedMapOutput(ExternalSorter.scala:770)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:663)
	at org.apache.spark.ml.tree.impl.RandomForest$.runBagged(RandomForest.scala:208)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:367)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:56)
	at org.apache.spark.ml.regression.GBTRegressor.$anonfun$train$1(GBTRegressor.scala:190)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.GBTRegressor.train(GBTRegressor.scala:166)
	at org.apache.spark.ml.regression.GBTRegressor.train(GBTRegressor.scala:56)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: C:\Users\joan.borras\AppData\Local\Temp\blockmgr-a538f18b-26be-47a1-85de-d0f2f672fc9b\1f\shuffle_86_11809_0.data.e936d4c2-c556-4e42-ad1b-5815780d2a5f (El sistema no puede encontrar la ruta especificada)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.initStream(LocalDiskShuffleMapOutputWriter.java:144)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.access$200(LocalDiskShuffleMapOutputWriter.java:45)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter$LocalDiskShufflePartitionWriter.openStream(LocalDiskShuffleMapOutputWriter.java:177)
	at org.apache.spark.shuffle.ShufflePartitionPairsWriter.open(ShufflePartitionPairsWriter.scala:68)
	at org.apache.spark.shuffle.ShufflePartitionPairsWriter.write(ShufflePartitionPairsWriter.scala:59)
	at org.apache.spark.util.collection.WritablePartitionedIterator.writeNext(WritablePartitionedPairCollection.scala:83)
	at org.apache.spark.util.collection.ExternalSorter.$anonfun$writePartitionedMapOutput$1(ExternalSorter.scala:767)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedMapOutput(ExternalSorter.scala:770)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [None]:
predictions = pipelineModel.transform(test)

In [None]:
predictions.select(target_column, "prediction", *featuresCols)