In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import DecisionTreeRegressor

In [0]:
IS_SPARK_SUBMIT_CLI = True

if IS_SPARK_SUBMIT_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

#LOAD DATA  -- KTSample.csv

In [0]:
file_location = "/FileStore/tables/KTSample.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df1 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [0]:
temp_table_name1 = "KTSample_csv"

df1.createOrReplaceTempView(temp_table_name1)

In [0]:
if IS_SPARK_SUBMIT_CLI:
    KTSample = spark.read.csv('/FileStore/tables/KTSample.csv', inferSchema=True, header=True)
else:
    KTSample = sqlContext.sql("select * from KTSample_csv")
    
KTSample.show(5)

#Select Data and Calculate the Trip Time in SEC

In [0]:
timediff=KTSample.select('tpep_pickup_datetime', 'tpep_dropoff_datetime','passenger_count','trip_distance',col('fare_amount').alias('label'))

timediff.show(5)

In [0]:
df2=timediff.withColumn('tpep_pickup_datetime',to_timestamp(col('tpep_pickup_datetime')))\
  .withColumn('tpep_dropoff_datetime', to_timestamp(col('tpep_dropoff_datetime')))\
  .withColumn('trip_time_in_secs',col("tpep_dropoff_datetime").cast("long") - col('tpep_pickup_datetime').cast("long"))
df2.show(5)

In [0]:
data=df2.select('passenger_count','trip_distance','trip_time_in_secs','label')

data.show(5)

#Setup Train and Test datasets

In [0]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

#Setup GBT-Regression

In [0]:
assembler = VectorAssembler(inputCols = ['passenger_count', 'trip_time_in_secs', 'trip_distance'], outputCol="features")
gbt = GBTRegressor(labelCol="label")

In [0]:
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 3])\
  .addGrid(gbt.maxIter, [10, 20])\
  .build()
  
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())

cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [0]:
pipeline = Pipeline(stages=[assembler, cv])
pipelineModel = pipeline.fit(train)

In [0]:
predictions = pipelineModel.transform(test)

In [0]:
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(10)

In [0]:
predicted.createOrReplaceTempView("regressionPredictions")

In [0]:
dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")

dataPred.show(10)

#RMSE for GBT-Regression

In [0]:
evaluator  = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print( "Root Mean Square Error (RMSE) for GBT Regression :", rmse)

#Setup Linear Regression

In [0]:
assembler = VectorAssembler(inputCols = ['passenger_count', 'trip_time_in_secs', 'trip_distance'], outputCol="features")
lr = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)
pipeline1 = Pipeline(stages=[assembler, lr])

In [0]:
paramGrid1 = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.01]).addGrid(lr.maxIter, [10, 5]).build()
trainval = TrainValidationSplit(estimator=pipeline1, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid1, trainRatio=0.8)

In [0]:
pipelineModel = trainval.fit(train)

In [0]:
predictions = pipelineModel.transform(test)

In [0]:
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(10)

In [0]:
predicted.createOrReplaceTempView("regressionPredictions")

In [0]:
dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")

dataPred.show(10)

#RMSE for Linear Regression

In [0]:
evaluator  = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print ("Root Mean Square Error (RMSE) for Linear Regression :", rmse)

#Decision Forest Regression

In [0]:
assembler = VectorAssembler(inputCols = ['passenger_count', 'trip_time_in_secs', 'trip_distance'], outputCol="features")
dt = DecisionTreeRegressor(labelCol="label",featuresCol="features")

In [0]:
paramGrid2 = ParamGridBuilder()\
  .addGrid(dt.maxDepth, [2,3])\
  .addGrid(dt.maxBins, [10,20])\
  .build()

In [0]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol=dt.getLabelCol(), predictionCol=dt.getPredictionCol())

dtcv = CrossValidator(estimator = dt, estimatorParamMaps = paramGrid2, evaluator = evaluator, numFolds=2)

In [0]:
pipeline2 = Pipeline(stages=[assembler, dtcv])
pipelineModel = pipeline2.fit(train)

In [0]:
predictions = pipelineModel.transform(test)

In [0]:
predicted = predictions.select("features","prediction","truelabel")
predicted.show(10)

In [0]:
predicted.createOrReplaceTempView("regressionPredictions")

In [0]:
dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")

dataPred.show(5)

#RMSE for Decision Forest Regression

In [0]:
evaluator  = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print( "Root Mean Square Error (RMSE) for Decision Forest Regression :", rmse)