In [1]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import DecisionTreeRegressor

In [2]:
%fs ls /FileStore/tables/nseComp_1-4319d.csv

In [3]:
nse = spark.sql("SELECT Code,Date,Time,Open,High,Low,Close,Volume FROM nseComp_1_4319d_csv")

In [4]:
nse.show(10)

In [5]:
nse1=nse.dropna()

In [6]:
data = nse1.select('Code','Date','Time','Open','High','Low','Close',col('Volume').alias('label'))

## Split the Data

In [8]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

## Gradient Boost Tree Regression

In [10]:
assembler = VectorAssembler(inputCols = ['Date','Open','High','Low','Close'], outputCol="features")
gbt = GBTRegressor(labelCol="label")

## Tune Parameter

In [12]:
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
  
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())

cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

## Define Pipeline

In [14]:
pipeline = Pipeline(stages=[assembler, cv])
pipelineModel = pipeline.fit(train)

## Test the Model

In [16]:
predictions = pipelineModel.transform(test)

In [17]:
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(100)

## Examine the Predicted Model

In [19]:
predicted.createOrReplaceTempView("regressionPredictions")


In [20]:
dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")

display(dataPred)

## RMSE Analysis

In [22]:
evaluator  = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "Root Mean Square Error (RMSE) for GBT Regression :", rmse

## Linear Regression

In [24]:
assembler = VectorAssembler(inputCols = ['Date','Open','High','Low','Close'], outputCol="features")
lr = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)
pipeline1 = Pipeline(stages=[assembler, lr])

## Tune Parameters

In [26]:
paramGrid1 = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.01]).addGrid(lr.maxIter, [10, 5]).build()
trainval = TrainValidationSplit(estimator=pipeline1, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid1, trainRatio=0.8)

## Define Pipeline

In [28]:
pipelineModel = trainval.fit(train)

## Test the Model

In [30]:
predictions = pipelineModel.transform(test)

In [31]:
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(100)

## Examine the Predicted Model

In [33]:
predicted.createOrReplaceTempView("regressionPredictions")

In [34]:
dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")

display(dataPred)

## RMSE Analysis

In [36]:
evaluator  = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "Root Mean Square Error (RMSE) for Linear Regression :", rmse