In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,split,dayofweek,round,sqrt,radians,cos,sin,asin,abs,max
#from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
spark=SparkSession.builder.appName('Regression').getOrCreate()
frame=spark.read.option("inferSchema",True).option("header",True).csv('./train.csv').dropna()

Preprocessing

In [None]:
df=frame.filter((frame.pickup_longitude!=frame.dropoff_longitude)|(frame.pickup_latitude!=frame.dropoff_latitude))  #invalid data filter out (pickup & dropoff location is same)
#df_0=StringIndexer(inputCol="store_and_fwd_flag",outputCol="flag").fit(df).transform(df)
dtime=split(col('pickup_datetime'),' ')
df_1=df.withColumn('day',dayofweek(dtime[0])).withColumn('month',split(dtime[0],'-')[1].cast('int')).withColumn('time',split(dtime[1],':')[0]*60+split(dtime[1],':')[1])
#df_2=df_1.withColumn('distance',sqrt((col('dropoff_latitude')-col('pickup_latitude'))**2+(col('dropoff_longitude')-col('pickup_longitude'))**2)) #euclidean distance
df_2=df_1.withColumn('distance',abs(2*asin(sqrt(sin((radians(col('dropoff_latitude'))-radians(col('pickup_latitude')))/2)**2+cos(radians(col('pickup_latitude')))*
    cos(radians(col('dropoff_latitude')))*sin((radians(col('dropoff_longitude'))-radians(col('pickup_longitude')))/2)**2))*6371)) #haversine distance
#df_round=df_2.withColumn('pickup_longitude',round(col('pickup_longitude'),4)).withColumn('pickup_latitude',round(col('pickup_latitude'),4)).withColumn(
#    'dropoff_longitude',round(col('dropoff_longitude'),4)).withColumn('dropoff_latitude',round(col('dropoff_latitude'),4))
df_clean=df_2.drop('id','vendor_id','pickup_datetime','dropoff_datetime','passenger_count','store_and_fwd_flag').dropDuplicates()
df_3=VectorAssembler().setInputCols(df_clean.drop('trip_duration').columns).setOutputCol("features").transform(df_clean).select("trip_duration","features")

In [None]:
train,test=df_3.randomSplit([0.8,0.2])
scaler=MinMaxScaler(inputCol="features",outputCol="sc_features").fit(train.select("features"))
train=scaler.transform(train)
test=scaler.transform(test)
evaluator=RegressionEvaluator().setLabelCol('trip_duration')
regressor=DecisionTreeRegressor(maxDepth=5,maxBins=32,impurity='variance',featuresCol='sc_features',labelCol='trip_duration',leafCol='leaf_id')

In [None]:
k=10 #K-fold cross validation
slots=train.randomSplit([1/k for _ in range(k)])
rmse=0
mae=0
for val_test in slots:
  cv_pred=regressor.fit(train.subtract(val_test)).transform(val_test)
  rmse+=evaluator.evaluate(cv_pred,{evaluator.metricName:"rmse"})
  mae+=evaluator.evaluate(cv_pred,{evaluator.metricName:"mae"})
print(rmse/k)
print(mae/k)

3864.1160742724232
408.9029403627838


In [None]:
model=regressor.fit(train)
predictions=model.transform(test)
print(evaluator.evaluate(predictions,{evaluator.metricName:"rmse"}))
print(evaluator.evaluate(predictions,{evaluator.metricName:"mae"}))

3175.77774831378
417.00255642688586


Enhanced model

In [None]:
en_train=model.transform(train).drop('prediction')
en_test=predictions.drop('prediction')
en_predictions=spark.createDataFrame(data=[],schema=predictions.select('trip_duration','features','sc_features','leaf_id','prediction').schema)
en_model=[]
for i in range(en_train.select("leaf_id").distinct().count()):
  model_enh=LinearRegression(maxIter=10,regParam=0.3,elasticNetParam=0.8,featuresCol='sc_features',labelCol='trip_duration').fit(en_train.filter(en_train.leaf_id==i))
  en_model.append(model_enh)
  en_predictions=en_predictions.union(en_model[i].transform(en_test.filter(en_test.leaf_id==i)))

In [None]:
'''
k=10 #K-fold cross validation for enhanced tree
slots=en_train.randomSplit([1/k for _ in range(k)])
rmse=0
mae=0
for val_test in slots:
  cv_pred=spark.createDataFrame(data=[],schema=predictions.select('trip_duration','features','sc_features','leaf_id','prediction').schema)
  cv_train=en_train.subtract(val_test)
  for i in range(cv_train.select("leaf_id").distinct().count()):
    cv_pred=cv_pred.union(LinearRegression(maxIter=10,regParam=0.3,elasticNetParam=0.8,featuresCol='sc_features',labelCol='trip_duration').fit(cv_train.filter(cv_train.leaf_id==i)).transform(val_test.filter(val_test.leaf_id==i)))
  rmse+=evaluator.evaluate(cv_pred,{evaluator.metricName:"rmse"})
  mae+=evaluator.evaluate(cv_pred,{evaluator.metricName:"mae"})
print(rmse/k)
print(mae/k)
'''

In [None]:
print(evaluator.evaluate(en_predictions,{evaluator.metricName:"rmse"}))
print(evaluator.evaluate(en_predictions,{evaluator.metricName:"mae"}))

3093.6043321332113
400.22093862789063
