In [1]:
#ch22_SparkML_02.ipynb
#Spark ML Pipeline 回歸分析

In [2]:
sc.master

u'local[*]'

In [3]:
#1.準備資料

In [4]:
#step1.1 
import pandas as pd
pandas_df=pd.read_csv("hour.csv")             #以pandas.read_csv() 將"hour.csv"讀入成 pandas Dataframe 'pandas_df'
hour_df=sqlContext.createDataFrame(pandas_df)    #以sqlContext.createDataframe() 將pandas_df 轉成spark Dataframe hour_df 

In [5]:
hour_df.count()

17379

In [6]:
print(hour_df.columns) #查看欄位

['instant', 'dteday', 'season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'casual', 'registered', 'cnt']


In [7]:
#step1.1a , 或者這樣作:
hour_df= spark.read.format('csv').option("header", 'true').load("hour.csv")

In [8]:
#step1.2 以Dataframe.drop() 捨棄不需要的欄位, instant,dateday,yr,casual,registered
hour_df=hour_df.drop("instant").drop("dteday") \
                            .drop('yr').drop("casual").drop("registered")

In [9]:
#step1.3 檢視schema, 所有欄位datatype都是string, 我們稍候要將datatype 轉為double
print hour_df.printSchema()

root
 |-- season: string (nullable = true)
 |-- mnth: string (nullable = true)
 |-- hr: string (nullable = true)
 |-- holiday: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- workingday: string (nullable = true)
 |-- weathersit: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- atemp: string (nullable = true)
 |-- hum: string (nullable = true)
 |-- windspeed: string (nullable = true)
 |-- cnt: string (nullable = true)

None


In [10]:
#step1.4 匯入 col module, 以col模組讀取欄位資料
from pyspark.sql.functions import col  

In [11]:
#step1.5 將datatype 轉為double
hour_df= hour_df.select([col(column).cast("double").alias(column)
                        for column in hour_df.columns])

In [12]:
hour_df.printSchema() #欄位datatype 已轉為 double

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)



In [13]:
hour_df.show(5) #查看結果

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
|   1.0| 1.0|0.0|    0.0|    6.0|       0.0|       1.0|0.24|0.2879|0.81|      0.0|16.0|
|   1.0| 1.0|1.0|    0.0|    6.0|       0.0|       1.0|0.22|0.2727| 0.8|      0.0|40.0|
|   1.0| 1.0|2.0|    0.0|    6.0|       0.0|       1.0|0.22|0.2727| 0.8|      0.0|32.0|
|   1.0| 1.0|3.0|    0.0|    6.0|       0.0|       1.0|0.24|0.2879|0.75|      0.0|13.0|
|   1.0| 1.0|4.0|    0.0|    6.0|       0.0|       1.0|0.24|0.2879|0.75|      0.0| 1.0|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
only showing top 5 rows



In [14]:
#step1.6  將資料依7:3分成訓練資料集(train_df)及測試資料集(test_df)
#              並以 DataFrame.cache() 將資料暫存在 MM
train_df, test_df = hour_df.randomSplit([0.7, 0.3])
train_df.cache()
test_df.cache()

DataFrame[season: double, mnth: double, hr: double, holiday: double, weekday: double, workingday: double, weathersit: double, temp: double, atemp: double, hum: double, windspeed: double, cnt: double]

In [15]:
#2. 建立Spark Machine Learning pipeline 

In [16]:
#step2.1 匯入所需模組,Pipeline,StringIndexer,VectorIndexer,VectorAssembler, DesisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.feature import  StringIndexer,  VectorIndexer,VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor

In [17]:
#step2.2 建立feature欄位 list, hour_df 只有最後欄位'cnt'是label(count per hour)
#              所以我們的featuer欄位從前面到倒數第二個 [:-1]
featuresCols = hour_df.columns[:-1]
print(featuresCols) 

['season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed']


In [18]:
#step2.3 建立 pipeline
#建立 VectorAssembler 物件 'vectorAssembler'
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="aFeatures")
#建立 VectorIndexer 物件 'vectorIndexer', mnth,hr,weekday轉成分類feature欄位
vectorIndexer = VectorIndexer(inputCol="aFeatures", outputCol="features", maxCategories=24) 
#建立 DecisionTreeRegressor 物件 'dt'
dt = DecisionTreeRegressor(labelCol="cnt",featuresCol= 'features')
#建立 Pipeline 物件 dt_pipeline, stages=vectorAssembler,vectorIndexer ,dt
dt_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer ,dt])

In [19]:
#step2.4 以getStages() 檢視 Pipeline stages
dt_pipeline .getStages()

[VectorAssembler_4169864eb47385bca5bf,
 VectorIndexer_45d4aef2fc51134af20c,
 DecisionTreeRegressor_40adba5c5a1ddeb66291]

In [20]:
#3. 使用 Pipeline 進行資料處理與訓練

In [21]:
#step3.1 以 Pipeline.fit(), 參數為 DataFrame train_df 進行訓練
#             訓練結果為 pyspark.ml.pipeline.PipelineModel  物件 'df_pipelineModel'
dt_pipelineModel = dt_pipeline.fit(train_df)

In [22]:
#step3.2 檢視訓練後DecisionTree Model, 位於pipeline 最後一個 stage
dt_pipelineModel.stages[-1]

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_40adba5c5a1ddeb66291) of depth 5 with 63 nodes

In [23]:
#step3.3 檢視練後DecisionTree Model規則 (只顯示前500筆) .toDebugString 屬性
print dt_pipelineModel.stages[-1].toDebugString[:500]

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_40adba5c5a1ddeb66291) of depth 5 with 63 nodes
  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})
   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})
    If (feature 2 in {2.0,3.0,4.0,5.0})
     If (feature 2 in {3.0,4.0})
      If (feature 4 in {1.0,2.0,3.0,4.0,5.0})
       Predict: 5.345375722543353
      Else (feature 4 not in {1.0,2.0,3.0,4.0,5.0})
       Predict: 17.24671052631579
     Else (feature 2 not in {3.0,4.0})
      If (fea


In [24]:
#4. 使用pipelineModel 進行預測

In [25]:
#step4.1 以PipelineModel.transform(), 參數為 DataFrame test_df 進行預測
#              預測結果 為 DataFrame 'predicted_df'
predicted_df=dt_pipelineModel.transform(test_df)

In [26]:
#step4.2 檢視預測結果 'preficted_df' 的欄位, 注意:新增 'prediction' 欄位
print predicted_df.columns

['season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'cnt', 'aFeatures', 'features', 'prediction']


In [27]:
#step4.3 檢視預測結果, not good!
predicted_df.select('season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', \
                     'weathersit', 'temp', 'atemp', 'hum', 'windspeed','cnt','prediction').show(20)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|        prediction|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1818| 0.8|   0.1045|33.0|  59.6554054054054|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       1.0|0.06|0.0606|0.41|    0.194| 7.0|36.475935828877006|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       1.0|0.22| 0.197|0.44|   0.3582| 5.0|36.475935828877006|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       1.0|0.24|0.2273| 0.6|   0.2239|15.0|36.475935828877006|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       2.0|0.32|0.2879|0.26|   0.4179|10.0|36.475935828877006|
|   1.0| 1.0|0.0|    0.0|    2.0|       1.0|       2.0|0.22|0.2424|0.87|   0.1045|14.0|36.475935828877006|
|   1.0| 1.0|0.0|    0.0|    3.0|    

In [28]:
#5. 評估模型的準確率

In [29]:
#step5.1 匯入模組 RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

In [30]:
#step5.2 建立RegressionEvaluator 物件 evaluator, 以RMSE 評估
evaluator = RegressionEvaluator(labelCol='cnt',
                                                        predictionCol='prediction',
                                                        metricName="rmse")

In [34]:
#step5.3 計算 RMSE
predicted_df=dt_pipelineModel.transform(test_df)
rmse = evaluator.evaluate(predicted_df)
rmse

92.51461487831746

In [31]:
#6. 使用TrainValidation進行訓練評估找出最佳模型
#    ML Tuning: model selection and hyperparameter tuning
#    Train-Validation Split
#    https://spark.apache.org/docs/2.1.0/ml-tuning.html#train-validation-split

In [34]:
#step6.1 匯入模組 ParamGridBuilder,
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

In [41]:
#step6.2 設定訓練驗證(Train Validation) 的參數
#使用 ParamGridBuilder() 
#設定 maxDepth 4種
#設定 maxBins 4種
#所以執行訓練驗證時會執行 4*4=16次
paramGrid = ParamGridBuilder()\
  .addGrid(dt.maxDepth, [ 5,10,15,25])\
  .addGrid(dt.maxBins, [25,35,45,50])\
  .build()

In [42]:
#step6.3 建立TrainValidationSplit 物件 tvs
tvs = TrainValidationSplit(estimator=dt,evaluator=evaluator,    #以之前建立的 DesisionTreeRegressor 'dt' 作為 estimator
                  estimatorParamMaps=paramGrid,trainRatio=0.8)    #訓練:8 與測試:2

In [43]:
#step6.4 建立 Pipeline 'tvs_pipeline'
tvs_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer ,tvs])

In [44]:
#step6.5 以Pipeline 'tvs_pipeline 對DataFrame 'train_df'進行驗證訓練,產生 Model 'tvs_pipelineModel'
tvs_pipelineModel =tvs_pipeline.fit(train_df)

In [45]:
#step6.6 檢視訓練後最佳模型(bestModel) 
bestModel=tvs_pipelineModel.stages[2].bestModel
print bestModel.toDebugString[:500]

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_40adba5c5a1ddeb66291) of depth 10 with 1753 nodes
  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})
   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})
    If (feature 2 in {2.0,3.0,4.0,5.0})
     If (feature 2 in {3.0,4.0})
      If (feature 4 in {1.0,2.0,3.0,4.0,5.0})
       If (feature 1 in {0.0,1.0,2.0,3.0,11.0})
        If (feature 7 <= 0.4)
         If (feature 0 in {0.0,1.0})
          If (feature 7 <= 0.2)
           If (feature


In [48]:
#step6.7 以tvs_pipelineModel.transform() 對測試資料 'test_df' 作預測, 並檢視部份預測結果
predictions=tvs_pipelineModel.transform(test_df)
predictions.select('cnt','prediction').show(20)

+----+------------------+
| cnt|        prediction|
+----+------------------+
|33.0| 44.42857142857143|
| 7.0|14.411764705882353|
| 5.0|14.411764705882353|
|15.0|              26.5|
|10.0|              26.5|
|14.0| 8.545454545454545|
| 9.0| 8.545454545454545|
|31.0|              17.5|
|17.0| 8.545454545454545|
|25.0|              17.5|
| 3.0|               3.0|
|14.0|14.411764705882353|
|27.0|              26.5|
|42.0|49.029411764705884|
|13.0|22.666666666666668|
|24.0|              51.0|
|25.0|14.411764705882353|
|23.0| 51.44444444444444|
|42.0|              51.0|
|17.0|              17.0|
+----+------------------+
only showing top 20 rows



In [49]:
#step6.8 評估最佳模型 RMSE, 有好一點
rmse= evaluator.evaluate(predictions)
rmse

81.88845514741912

In [None]:
#7. 使用crossValidation進行交叉訓練找出最佳模型 

In [50]:
#step7.1 匯入模組 CrossValidator
from pyspark.ml.tuning import CrossValidator

In [51]:
#step7.2 建立 CrossValidator 物件 'cv', k=3; 'cv2', k=10
#              k=3時會執行 4*4*3=48次, k=10時會執行4*4*10=160次
cv = CrossValidator(estimator=dt, evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, numFolds=3)

cv2 = CrossValidator(estimator=dt, evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, numFolds=10)

In [52]:
#step7.3 建立 CrossValidator Pipeline 物件 'cv_pipeline' 及  'cv2_pipeline'
cv_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer ,cv])
cv2_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer ,cv2])

In [53]:
#step7.4a 使用 cv_pipeline.fit() 以 DataFrame train_df 做交叉訓練驗證,產生模型 'cv_pipelineModel
cv_pipelineModel = cv_pipeline.fit(train_df)

In [55]:
#step7.4b 使用 cv2_pipeline.fit() 以 DataFrame train_df 做交叉訓練驗證,產生模型 'cv2_pipelineModel
cv2_pipelineModel = cv2_pipeline.fit(train_df)   #需要一段長時間,請勿在課程進行中執行

In [54]:
#step7.5a 評估交叉驗證 (k=3) 最佳模型 RMSE
predictions = cv_pipelineModel.transform(test_df)
rmse= evaluator.evaluate(predictions)
rmse

82.72608308415863

In [56]:
#step7.5b 評估交叉驗證 (k=10) 最佳模型 RMSE, 進步一點點
predictions = cv2_pipelineModel.transform(test_df)
rmse= evaluator.evaluate(predictions)
rmse

81.98824648924268

In [None]:
#練習:使用隨機森林RandomForestClassifier分類器

In [38]:
#from pyspark.ml.regression import RandomForestRegressor
#vectorAssembler = 
#vectorIndexer = 
#rf= RandomForestRegressor(labelCol="cnt",featuresCol= 'features', numTrees=20)
#......

98.46412057609375

In [39]:
#from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
#paramGrid = 
#......

71.12886396206777

In [40]:
#8. 使用GBT(Gradient-Boosted Tree) 梯度提昇決策樹
#   GTB與Random Forests 都是集合很多決策樹,不同的是訓練方式

In [57]:
#step8.1 建立GBT Regression
#匯入 GBT Regression 模組
from pyspark.ml.regression import GBTRegressor
#建立 GBTRegressor 物件 'gbt' , label='cnt', feature vector='features'
gbt = GBTRegressor(labelCol="cnt",featuresCol= 'features')
#建立 Pipeline 物件 'gbt_pipeline'
gbt_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer,gbt])

In [58]:
#step8.2 訓練與評估
#以gbt_pipeline.fit() , train_df 訓練, 結果是 Model 'gbt_pipelineModel'
gbt_pipelineModel = gbt_pipeline.fit(train_df)
#以gbt_pipelineModel.transform(), test_df, 預測結果為 predicted_df
predicted_df=gbt_pipelineModel.transform(test_df)
#評估 rmse
rmse = evaluator.evaluate(predicted_df)
rmse

76.12550201832394

In [59]:
#step8.3 使用交叉驗證(CrossValidation) GBT Regression 找出最佳模型 

In [60]:
#匯入所需模組,CrossValidator, ParamGridBuilder, RegressionEvaluator, Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

#以 ParamGridBuilder()  建立參數Grid 'paramGrid'
paramGrid = ParamGridBuilder() \
  .addGrid(gbt.maxDepth, [ 5,10])\
  .addGrid(gbt.maxBins, [25,40])\
  .addGrid(gbt.maxIter, [10, 50])\
  .build()

#建立 CrossValidator 物件 'cv' k=3
cv = CrossValidator(estimator=gbt, evaluator=evaluator, 
                                  estimatorParamMaps=paramGrid, numFolds=3)

#建立 Pipeline 物件 cv_pipeline
cv_pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [61]:
#step8.4 以cv_pipeline.fit() , train_df 訓練資料,執行交叉驗證,結果為Model 'cv_pipelineModel'
cv_pipelineModel = cv_pipeline.fit(train_df)

In [62]:
#step8.5 檢視最佳模型的規則
cvm=cv_pipelineModel.stages[2] 
gbestModel=cvm.bestModel
print bestModel.toDebugString[:500]

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_40adba5c5a1ddeb66291) of depth 10 with 1753 nodes
  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})
   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})
    If (feature 2 in {2.0,3.0,4.0,5.0})
     If (feature 2 in {3.0,4.0})
      If (feature 4 in {1.0,2.0,3.0,4.0,5.0})
       If (feature 1 in {0.0,1.0,2.0,3.0,11.0})
        If (feature 7 <= 0.4)
         If (feature 0 in {0.0,1.0})
          If (feature 7 <= 0.2)
           If (feature


In [63]:
#step8.6 預測
#以 cv_piplelineModel.transform() , DataFrame test_df 作預測, 預測結果為 'predicted'
predicted_df=cv_pipelineModel.transform(test_df)
#檢視預測結果
predicted_df.select('season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', \
                     'weathersit', 'temp', 'atemp', 'hum', 'windspeed','cnt','prediction').show(10)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|        prediction|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1818| 0.8|   0.1045|33.0| 44.50534466103074|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       1.0|0.06|0.0606|0.41|    0.194| 7.0| 4.012207749638969|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       1.0|0.22| 0.197|0.44|   0.3582| 5.0|18.002747653346976|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       1.0|0.24|0.2273| 0.6|   0.2239|15.0| 16.06004975059944|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       2.0|0.32|0.2879|0.26|   0.4179|10.0| 8.355974651245623|
|   1.0| 1.0|0.0|    0.0|    2.0|       1.0|       2.0|0.22|0.2424|0.87|   0.1045|14.0|11.050031812678764|
|   1.0| 1.0|0.0|    0.0|    3.0|    

In [64]:
#step8.7 評估, 計算 RMSE
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol='cnt', predictionCol='prediction')
rmse = evaluator.evaluate(predicted_df)
rmse

70.94139281419096