In [1]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import DecisionTreeRegressor

In [2]:
%fs ls /FileStore/tables/Book3a.csv

In [3]:
jobs = spark.sql("SELECT cast(Job_ID as double),cast(Posting_Type as int),cast(Title_Code_No as double),cast(Level as int),cast(FullTime_PartTime as int),cast(Salary_Range_To as double),cast(Salary_Frequency as int),cast(Hours_Shift as int) FROM nycjobs")

In [4]:
jobs.show(5)

In [5]:
jobs1=jobs.dropna()

In [6]:
data = jobs1.select('Job_ID','Posting_Type','Title_Code_No','Level','FullTime_PartTime',col('Salary_Range_To').alias('label'),'Salary_Frequency','Hours_Shift')

In [7]:
data.show(5)

#### Split the Data

In [9]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

###Gradient Booster Tree Regression

In [11]:
assembler = VectorAssembler(inputCols = ['Job_ID','Posting_Type','Title_Code_No','Level','FullTime_PartTime','Salary_Frequency','Hours_Shift'], outputCol="features")
gbt = GBTRegressor(labelCol="label")

####Tune Parameters

In [13]:
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
  
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())

cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

####Define the Pipeline

In [15]:
pipeline = Pipeline(stages=[assembler, cv])
pipelineModel = pipeline.fit(train)

####Test the Model

In [17]:
predictions = pipelineModel.transform(test)

In [18]:
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(100)

####Examine the Predicted and Actual Values

In [20]:
predicted.createOrReplaceTempView("regressionPredictions")


In [21]:
dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")

display(dataPred)

####RMSE Analysis

In [23]:
evaluator  = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "Root Mean Square Error (RMSE) for GBT Regression :", rmse

###Linear Regression

In [25]:
assembler = VectorAssembler(inputCols = ['Job_ID','Posting_Type','Title_Code_No','Level','FullTime_PartTime','Salary_Frequency','Hours_Shift'], outputCol="features")
lr = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)
pipeline1 = Pipeline(stages=[assembler, lr])


####Tune Parameters

In [27]:
paramGrid1 = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.01]).addGrid(lr.maxIter, [10, 5]).build()
trainval = TrainValidationSplit(estimator=pipeline1, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid1, trainRatio=0.8)

####Define the Pipeline

In [29]:
pipelineModel = trainval.fit(train)

####Test the Model

In [31]:
predictions = pipelineModel.transform(test)

In [32]:
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(100)

####Examine the Predicted and Actual Values

In [34]:
predicted.createOrReplaceTempView("regressionPredictions")


In [35]:
dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")

display(dataPred)

####RMSE Analysis

In [37]:
evaluator  = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "Root Mean Square Error (RMSE) for Linear Regression :", rmse