In [1]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql.functions import monotonically_increasing_id



In [2]:
csv =sqlContext.sql("select * from nysalary")
csv.show(5)
maxval_nine = sqlContext.sql("select count(Salaries)*.90 from nysalary")
maxval_twen = sqlContext.sql("select count(Salaries)*.10 from nysalary")
maxval_nine.show()
maxval_twen.show()

In [3]:
rownumber = sqlContext.sql("select ROW_NUMBER() over (ORDER BY Salaries) AS Row, Salaries from nysalary")
rownumber.createOrReplaceTempView("res1")
val_nineper = sqlContext.sql("select Salaries,Row from res1 where Row = '302751' ")
val_twenper = sqlContext.sql("select Salaries,Row from res1 where Row = '33639' ")
val_twenper.show()
val_nineper.show()

In [4]:
rownumber2 = sqlContext.sql("select ROW_NUMBER() over (ORDER BY Retirement) AS Row, Retirement from nysalary")
rownumber2.createOrReplaceTempView("res2")
ret_nineper = sqlContext.sql("select Retirement,Row from res2 where Row = '302751' ")
ret_twenper = sqlContext.sql("select Retirement,Row from res2 where Row = '33639' ")
ret_twenper.show()
ret_nineper.show()

In [5]:
rownumber3 = sqlContext.sql("select ROW_NUMBER() over (ORDER BY HealthDental) AS Row, HealthDental from nysalary")
rownumber3.createOrReplaceTempView("res3")
hd_nineper = sqlContext.sql("select HealthDental,Row from res3 where Row = '302751' ")
hd_twenper = sqlContext.sql("select HealthDental,Row from res3 where Row = '33639' ")
hd_twenper.show()
hd_nineper.show()

In [6]:
rownumber3 = sqlContext.sql("select ROW_NUMBER() over (ORDER BY TotalCompensation) AS Row, TotalCompensation from nysalary")
rownumber3.createOrReplaceTempView("res4")
tc_nineper = sqlContext.sql("select TotalCompensation,Row from res4 where Row = '302751' ")
tc_twenper = sqlContext.sql("select TotalCompensation,Row from res4 where Row = '33639' ")
tc_twenper.show()
tc_nineper.show()

In [7]:

csv1 = sqlContext.sql(" select cast(Salaries as double),cast(Retirement as double),cast(HealthDental as double),cast(TotalCompensation as double) from nysalary")
csv1 = csv1.dropna()
data = csv1.select("Salaries","Retirement","HealthDental", col("TotalCompensation").alias("label")).where(col("Salaries") >= ((4994.4))).where (col("Salaries") <= (121743.54)).where(col("Retirement") >= (0.0)).where (col("Retirement") <= (25288.1)).where(col("HealthDental") >= (764.58)).where (col("HealthDental") <= (13489.25)).where (col("TotalCompensation") >= (7934.39)).where (col("TotalCompensation") <= (189487.16))

# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

In [8]:
assembler = VectorAssembler(inputCols = ["Salaries","Retirement","HealthDental"], outputCol="features")
dt = DecisionTreeRegressor(featuresCol="features")

In [9]:
paramGrid = ParamGridBuilder()\
  .addGrid(dt.maxDepth, [5, 10])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=dt.getLabelCol(), predictionCol=dt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=dt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [10]:
pipeline = Pipeline(stages=[assembler, cv])
pipelineModel = pipeline.fit(train)

In [11]:
predictions = pipelineModel.transform(test)

In [12]:
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(100)

In [13]:
evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "Root Mean Square Error (RMSE) for Decession Tree Model:", rmse

In [14]:
assembler = VectorAssembler(inputCols = ["Salaries","Retirement","HealthDental"], outputCol="features")
gbt = GBTRegressor(labelCol="label")

In [15]:
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [16]:
pipeline = Pipeline(stages=[assembler, cv])
pipelineModel = pipeline.fit(train)

In [17]:
predictions = pipelineModel.transform(test)

In [18]:
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(100)

In [19]:
evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "Root Mean Square Error (RMSE) for GBT Regression :", rmse