In [1]:
global Path
if sc.master[0:5]=="local":
    Path="file:///home/vmauser/pythonwork/PythonProject/"
else:
    Path="file:/home/vmauser/"

### Data preprocessing

#### Importing data

In [3]:
hour_df=sqlContext.read.format("csv").option("header", "true").load(Path+"data/hour.csv")

print(hour_df.count())
print(len(hour_df.columns))
print(hour_df.columns)

17379
17
['instant', 'dteday', 'season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'casual', 'registered', 'cnt']


#### Dropping redundant columns

In [5]:
hour_df=hour_df.drop("instant").drop("dteday").drop("yr").drop("casual").drop("registered")
hour_df.printSchema()

root
 |-- season: string (nullable = true)
 |-- mnth: string (nullable = true)
 |-- hr: string (nullable = true)
 |-- holiday: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- workingday: string (nullable = true)
 |-- weathersit: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- atemp: string (nullable = true)
 |-- hum: string (nullable = true)
 |-- windspeed: string (nullable = true)
 |-- cnt: string (nullable = true)



#### Type transformation

In [7]:
from pyspark.sql.functions import col

hour_df=hour_df.select([col(column).cast("double").alias(column) for column in hour_df.columns])

print hour_df.printSchema()
print(hour_df.show(5))

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)

None
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
|   1.0| 1.0|0.0|    0.0|    6.0|       0.0|       1.0|0.24|0.2879|0.81|      0.0|16.0|
|   1.0| 1.0|1.0|    0.0|    6.0|       0.0|       1.0|0.22|0.2727| 0.8|      0.0|40.0|
|   1.0| 1.0|2.0|    0.0|    6.0|       0.0|       1.0|0.22|0.2727| 0.8|      0.0|32.0|
|   1.0

#### Train/test splitting

In [8]:
train_df, test_df=hour_df.randomSplit([0.7, 0.3])
train_df.cache()
test_df.cache()

DataFrame[season: double, mnth: double, hr: double, holiday: double, weekday: double, workingday: double, weathersit: double, temp: double, atemp: double, hum: double, windspeed: double, cnt: double]

### Decision tree regression

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor

# Collect the feature columns
featuresCols=hour_df.columns[:-1]
print(featuresCols)

vectorAssembler=VectorAssembler(inputCols=featuresCols, outputCol="Afeatures")
vectorIndexer=VectorIndexer(inputCol="Afeatures", outputCol="features", maxCategories=24)
dt=DecisionTreeRegressor(labelCol="cnt", featuresCol="features")

dt_pipeline=Pipeline(stages=[vectorAssembler, vectorIndexer, dt])
dt_pipeline.getStages()

['season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed']


[VectorAssembler_c190bbc9563c,
 VectorIndexer_fcad48added4,
 DecisionTreeRegressor_276d5815a61e]

In [13]:
pipelineModel=dt_pipeline.fit(train_df)
pipelineModel.stages[2].toDebugString[:500]

u'DecisionTreeRegressionModel (uid=DecisionTreeRegressor_276d5815a61e) of depth 5 with 63 nodes\n  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})\n   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})\n    If (feature 2 in {2.0,3.0,4.0,5.0})\n     If (feature 4 in {1.0,2.0,3.0,4.0,5.0})\n      If (feature 2 in {2.0,3.0,4.0})\n       Predict: 6.587392550143266\n      Else (feature 2 not in {2.0,3.0,4.0})\n       Predict: 24.722666666666665\n     Else (feature 4 not in {1.0,2.0,3.0,4.0,5.0})\n      If (fe'

In [14]:
predicted_df=pipelineModel.transform(test_df)
print(predicted_df.columns)

['season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'cnt', 'Afeatures', 'features', 'prediction']


In [15]:
predicted_df.select('season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday',\
'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'cnt', 'prediction').show(10)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+-----------------+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|       prediction|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+-----------------+
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1364| 0.8|   0.2985|52.0|54.21582733812949|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.36|0.3788|0.66|      0.0|48.0|54.21582733812949|
|   1.0| 1.0|0.0|    0.0|    2.0|       1.0|       1.0|0.14|0.1667|0.59|   0.1045|12.0|37.56857142857143|
|   1.0| 1.0|0.0|    0.0|    2.0|       1.0|       2.0|0.26|0.2273| 0.7|   0.3284|12.0|37.56857142857143|
|   1.0| 1.0|0.0|    0.0|    3.0|       1.0|       1.0| 0.2|0.2576|0.64|      0.0| 6.0|37.56857142857143|
|   1.0| 1.0|0.0|    0.0|    3.0|       1.0|       1.0|0.26| 0.303|0.81|      0.0|28.0|37.56857142857143|
|   1.0| 1.0|0.0|    0.0|    4.0|       1.0|  

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="rmse")

rmse=evaluator.evaluate(predicted_df)
rmse

97.34398927140036

#### Decision tree regression with TrainValidation

In [18]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator

featuresCols=hour_df.columns[:-1]

vectorAssembler=VectorAssembler(inputCols=featuresCols, outputCol="Afeatures")

vectorIndexer=VectorIndexer(inputCol="Afeatures", outputCol="features", maxCategories=24)

dt=DecisionTreeRegressor(labelCol="cnt", featuresCol="features")

paramGrid=ParamGridBuilder().addGrid(dt.maxDepth, [5, 10, 15, 25])\
.addGrid(dt.maxBins, [25, 35, 45, 50])\
.build()

evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="rmse")

tvs=TrainValidationSplit(estimator=dt, evaluator=evaluator,\
                         estimatorParamMaps=paramGrid, trainRatio=0.8)

tvs_pipeline=Pipeline(stages=[vectorAssembler, vectorIndexer, tvs])
tvs_pipelineModel=tvs_pipeline.fit(train_df)

bestModel=tvs_pipelineModel.stages[2].bestModel
bestModel.toDebugString[:500]

u'DecisionTreeRegressionModel (uid=DecisionTreeRegressor_276d5815a61e) of depth 10 with 1833 nodes\n  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})\n   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})\n    If (feature 2 in {2.0,3.0,4.0,5.0})\n     If (feature 4 in {1.0,2.0,3.0,4.0,5.0})\n      If (feature 2 in {2.0,3.0,4.0})\n       If (feature 2 in {3.0,4.0})\n        If (feature 1 in {0.0,1.0,2.0,3.0,11.0})\n         If (feature 0 in {0.0})\n          If (feature 7 <= 0.33)\n           If (feature '

In [19]:
predictions=tvs_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predictions)
rmse

97.34398927140036

#### Decision tree regression with CrossValidation

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

featuresCols=hour_df.columns[:-1]

vectorAssembler=VectorAssembler(inputCols=featuresCols, outputCol="Afeatures")

vectorIndexer=VectorIndexer(inputCol="Afeatures", outputCol="features", maxCategories=24)

dt=DecisionTreeRegressor(labelCol="cnt", featuresCol="features")

paramGrid=ParamGridBuilder().addGrid(dt.maxDepth, [5, 10, 15, 25])\
.addGrid(dt.maxBins, [25, 35, 45, 50])\
.build()

evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="rmse")
#-----------------------------------------------------------------------------------------------------
cv=CrossValidator(estimator=dt, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3)

cv_pipeline=Pipeline(stages=[vectorAssembler, vectorIndexer, cv])
cv_pipelineModel=cv_pipeline.fit(train_df)

predictions=tvs_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predictions)
rmse

81.3626354870387

In [23]:
predictions.select('season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday',\
'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'cnt', 'prediction').show(10)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|        prediction|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1364| 0.8|   0.2985|52.0|              43.0|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.36|0.3788|0.66|      0.0|48.0|              91.0|
|   1.0| 1.0|0.0|    0.0|    2.0|       1.0|       1.0|0.14|0.1667|0.59|   0.1045|12.0|13.058823529411764|
|   1.0| 1.0|0.0|    0.0|    2.0|       1.0|       2.0|0.26|0.2273| 0.7|   0.3284|12.0|20.558139534883722|
|   1.0| 1.0|0.0|    0.0|    3.0|       1.0|       1.0| 0.2|0.2576|0.64|      0.0| 6.0|13.058823529411764|
|   1.0| 1.0|0.0|    0.0|    3.0|       1.0|       1.0|0.26| 0.303|0.81|      0.0|28.0|20.558139534883722|
|   1.0| 1.0|0.0|    0.0|    4.0|    

### Gradient boost tree regression

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

featuresCols=hour_df.columns[:-1]

vectorAssembler=VectorAssembler(inputCols=featuresCols, outputCol="Afeatures")
vectorIndexer=VectorIndexer(inputCol="Afeatures", outputCol="features", maxCategories=24)
gbt=GBTRegressor(labelCol="cnt", featuresCol="features")

gbt_pipeline=Pipeline(stages=[vectorAssembler, vectorIndexer, gbt])

gbt_pipelineModel=gbt_pipeline.fit(train_df)
predictions=gbt_pipelineModel.transform(test_df)

evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="rmse")

rmse=evaluator.evaluate(predictions)
rmse

77.72245187508419

#### Gradient boost tree regression with CrossValidation

In [28]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

featuresCols=hour_df.columns[:-1]

vectorAssembler=VectorAssembler(inputCols=featuresCols, outputCol="Afeatures")
vectorIndexer=VectorIndexer(inputCol="Afeatures", outputCol="features", maxCategories=24)
gbt=GBTRegressor(labelCol="cnt", featuresCol="features")

paramGrid=ParamGridBuilder().addGrid(gbt.maxDepth, [5, 10])\
.addGrid(gbt.maxBins, [25, 40])\
.addGrid(gbt.maxIter, [10, 50])\
.build()

evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="rmse")

cv=CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3)

cv_pipeline=Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

cv_pipelineModel=cv_pipeline.fit(train_df)

predictions=cv_pipelineModel.transform(test_df)

predictions.select('season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday',\
'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'cnt', 'prediction').show(10)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|        prediction|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1364| 0.8|   0.2985|52.0|30.020968047810054|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.36|0.3788|0.66|      0.0|48.0|107.53821688440438|
|   1.0| 1.0|0.0|    0.0|    2.0|       1.0|       1.0|0.14|0.1667|0.59|   0.1045|12.0|20.243304002033565|
|   1.0| 1.0|0.0|    0.0|    2.0|       1.0|       2.0|0.26|0.2273| 0.7|   0.3284|12.0|25.402519433813612|
|   1.0| 1.0|0.0|    0.0|    3.0|       1.0|       1.0| 0.2|0.2576|0.64|      0.0| 6.0| 22.17624724769169|
|   1.0| 1.0|0.0|    0.0|    3.0|       1.0|       1.0|0.26| 0.303|0.81|      0.0|28.0|30.432900917798495|
|   1.0| 1.0|0.0|    0.0|    4.0|    

In [29]:
rmse=evaluator.evaluate(predictions)
rmse

71.9596662703907