# sparkで機械学習がしたい
http://mogile.web.fc2.com/spark/ml-classification-regression.html

In [58]:
import sys

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.datasets import load_iris
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import roc_auc_score, roc_curve
from sklearn.metrics import confusion_matrix, classification_report

from IPython.display import display

In [59]:
data_iris_all = load_iris(as_frame=True)

data_iris = data_iris_all["data"]
target_iris = data_iris_all["target"]
target_label_iris = data_iris_all["target_names"]
frame_iris = data_iris_all["frame"]

print(data_iris_all.keys())
display(data_iris.head())
print(data_iris.shape)
print(target_iris.value_counts())

dict_keys(['data', 'target', 'frame', 'target_names', 'DESCR', 'feature_names', 'filename'])


Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm)
0,5.1,3.5,1.4,0.2
1,4.9,3.0,1.4,0.2
2,4.7,3.2,1.3,0.2
3,4.6,3.1,1.5,0.2
4,5.0,3.6,1.4,0.2


(150, 4)
0    50
1    50
2    50
Name: target, dtype: int64


In [60]:
display(frame_iris.head())
display(frame_iris.info())

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150 entries, 0 to 149
Data columns (total 5 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   sepal length (cm)  150 non-null    float64
 1   sepal width (cm)   150 non-null    float64
 2   petal length (cm)  150 non-null    float64
 3   petal width (cm)   150 non-null    float64
 4   target             150 non-null    int64  
dtypes: float64(4), int64(1)
memory usage: 6.0 KB


None

### Sparkのセッションを起動
javaが必要

In [61]:
from pyspark.sql import SparkSession
# セッションを開く
try:
    spark.stop()
except NameError:
    pass

spark = SparkSession.builder.appName("pyspark-notebook").master(master="local[*]").getOrCreate()
spark

In [62]:
"""
バージョン確認
"""
print (f"Python version : {sys.version}")
print (f"Spark version : {spark.version}")



Python version : 3.8.5 (default, Jul 21 2020, 10:48:26) 
[Clang 11.0.3 (clang-1103.0.32.62)]
Spark version : 3.1.1


In [63]:
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler, VectorIndexer

In [64]:
#データの読み込み（pandasのDataFrameより）
spark_data = spark.createDataFrame(frame_iris)

columns = spark_data.columns

In [65]:
print("columns : ", spark_data.columns)
print("rows : ", spark_data.count())
spark_data.show(10)

columns :  ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)', 'target']
rows :  150
+-----------------+----------------+-----------------+----------------+------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|
+-----------------+----------------+-----------------+----------------+------+
|              5.1|             3.5|              1.4|             0.2|     0|
|              4.9|             3.0|              1.4|             0.2|     0|
|              4.7|             3.2|              1.3|             0.2|     0|
|              4.6|             3.1|              1.5|             0.2|     0|
|              5.0|             3.6|              1.4|             0.2|     0|
|              5.4|             3.9|              1.7|             0.4|     0|
|              4.6|             3.4|              1.4|             0.3|     0|
|              5.0|             3.4|              1.5|             0.2|     0|
|              

In [66]:
"""
特徴量をまとめる
"""
feature_cols = columns[:-1]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
spark_data_2 = assembler.transform(spark_data)

print("columns : ", spark_data_2.columns)
print("rows : ", spark_data_2.count())
spark_data_2.show(10)

columns :  ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)', 'target', 'features']
rows :  150
+-----------------+----------------+-----------------+----------------+------+-----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|         features|
+-----------------+----------------+-----------------+----------------+------+-----------------+
|              5.1|             3.5|              1.4|             0.2|     0|[5.1,3.5,1.4,0.2]|
|              4.9|             3.0|              1.4|             0.2|     0|[4.9,3.0,1.4,0.2]|
|              4.7|             3.2|              1.3|             0.2|     0|[4.7,3.2,1.3,0.2]|
|              4.6|             3.1|              1.5|             0.2|     0|[4.6,3.1,1.5,0.2]|
|              5.0|             3.6|              1.4|             0.2|     0|[5.0,3.6,1.4,0.2]|
|              5.4|             3.9|              1.7|             0.4|     0|[5.4,3.9,1.7,0.4]|

In [67]:
"""
学習用データの作成
pysparkのlabelColのデフォルトが'label'なので変更しておく
"""
learning_data = spark_data_2.select("features", F.col("target").alias("label"))
learning_data.show(10)

train, test = learning_data.randomSplit([0.8, 0.2], seed=10)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|    0|
|[4.9,3.0,1.4,0.2]|    0|
|[4.7,3.2,1.3,0.2]|    0|
|[4.6,3.1,1.5,0.2]|    0|
|[5.0,3.6,1.4,0.2]|    0|
|[5.4,3.9,1.7,0.4]|    0|
|[4.6,3.4,1.4,0.3]|    0|
|[5.0,3.4,1.5,0.2]|    0|
|[4.4,2.9,1.4,0.2]|    0|
|[4.9,3.1,1.5,0.1]|    0|
+-----------------+-----+
only showing top 10 rows



In [68]:
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#### ランダムフォレスト

In [69]:
rf = RandomForestClassifier(
    featuresCol="features"
    , labelCol="label"
    , numTrees=3
)

model = rf.fit(train)

In [70]:
predictions = model.transform(test)
#予測結果の確認
predictions.toPandas().sample(10)

Unnamed: 0,features,label,rawPrediction,probability,prediction
5,"[5.0, 3.4, 1.5, 0.2]",0,"[3.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0
29,"[6.7, 3.1, 5.6, 2.4]",2,"[0.0, 0.0, 3.0]","[0.0, 0.0, 1.0]",2.0
19,"[5.0, 2.3, 3.3, 1.0]",1,"[0.0, 3.0, 0.0]","[0.0, 1.0, 0.0]",1.0
18,"[6.5, 2.8, 4.6, 1.5]",1,"[0.0, 2.9, 0.1]","[0.0, 0.9666666666666667, 0.03333333333333333]",1.0
20,"[5.5, 2.6, 4.4, 1.2]",1,"[0.0, 3.0, 0.0]","[0.0, 1.0, 0.0]",1.0
14,"[5.6, 2.5, 3.9, 1.1]",1,"[0.0, 3.0, 0.0]","[0.0, 1.0, 0.0]",1.0
7,"[5.7, 4.4, 1.5, 0.4]",0,"[0.0, 3.0, 0.0]","[0.0, 1.0, 0.0]",1.0
17,"[6.3, 2.5, 4.9, 1.5]",1,"[0.0, 0.9, 2.1]","[0.0, 0.3, 0.7000000000000001]",2.0
30,"[6.7, 3.3, 5.7, 2.1]",2,"[0.0, 0.0, 3.0]","[0.0, 0.0, 1.0]",2.0
3,"[4.8, 3.4, 1.9, 0.2]",0,"[3.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0


In [71]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label"
    , predictionCol="prediction"
    , metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(accuracy)

0.9142857142857143


#### GBT回帰

In [74]:
#とりあえずデフォルトパラメータで学習
gbtr = GBTRegressor(
    featuresCol="features"
    , labelCol="label"
    , maxIter=100
)

#学習
model = gbtr.fit(train)

In [75]:
predictions = model.transform(test)
predictions.toPandas().sample(10)

Unnamed: 0,features,label,prediction
14,"[5.6, 2.5, 3.9, 1.1]",1,1.0
32,"[6.8, 3.2, 5.9, 2.3]",2,2.0
9,"[4.9, 3.6, 1.4, 0.1]",0,0.0
21,"[6.0, 3.4, 4.5, 1.6]",1,1.0
1,"[4.6, 3.4, 1.4, 0.3]",0,0.0
17,"[6.3, 2.5, 4.9, 1.5]",1,1.0
34,"[7.9, 3.8, 6.4, 2.0]",2,2.0
24,"[6.5, 3.2, 5.1, 2.0]",2,2.0
16,"[6.1, 2.9, 4.7, 1.4]",1,1.0
29,"[6.7, 3.1, 5.6, 2.4]",2,2.0


In [76]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label"
    , predictionCol="prediction"
    , metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(accuracy)

0.9428571428571428


In [77]:
spark.stop()

## CrossValidation
### データの準備

In [24]:
from sklearn.datasets import load_boston

In [105]:
from pyspark.sql import SparkSession
# セッションを開く
try:
    spark.stop()
except NameError:
    pass

spark = SparkSession.builder.appName("pyspark-notebook").master(master="local[*]").getOrCreate()
spark

In [106]:
boston_data = load_boston()

boston_df = pd.DataFrame(boston_data["data"], columns=boston_data["feature_names"])
boston_df["target"] = boston_data["target"]
feature_cols = boston_data["feature_names"]
boston_df = spark.createDataFrame(boston_df)
boston_df.show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|target|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98|  24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14|  21.6|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03|  34.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94|  33.4|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33|  36.2|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21|  28.7|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0|   15.2| 395.6|12.43|  22.9|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0|   15.2| 396.9|19.15|  27.1|
|0.21124|12.5| 7.87| 

In [107]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
boston_df = assembler.transform(boston_df)

boston_df.show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+------+--------------------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|target|            features|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+------+--------------------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98|  24.0|[0.00632,18.0,2.3...|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14|  21.6|[0.02731,0.0,7.07...|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03|  34.7|[0.02729,0.0,7.07...|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94|  33.4|[0.03237,0.0,2.18...|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33|  36.2|[0.06905,0.0,2.18...|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21|  28.7|[0.02985,0.0,2.18...|
|0.08829|1

In [108]:
boston_df = boston_df.select("features", F.col("target").alias("label"))
boston_df.show(10)

train, test = boston_df.randomSplit([0.8, 0.2], seed=10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
|[0.03237,0.0,2.18...| 33.4|
|[0.06905,0.0,2.18...| 36.2|
|[0.02985,0.0,2.18...| 28.7|
|[0.08829,12.5,7.8...| 22.9|
|[0.14455,12.5,7.8...| 27.1|
|[0.21124,12.5,7.8...| 16.5|
|[0.17004,12.5,7.8...| 18.9|
+--------------------+-----+
only showing top 10 rows



* CrossValidator：https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html
* ParamGridBuilder：

In [109]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

gbtr = GBTRegressor(featuresCol="features", labelCol="label")
evaluator = RegressionEvaluator(
    predictionCol="prediction"
    , labelCol="label"
    , metricName="rmse"
)



In [110]:
param_grid = ParamGridBuilder()\
    .addGrid(gbtr.maxDepth, [3, 4, 5])\
    .addGrid(gbtr.subsamplingRate, [0.6, 0.8, 1.0])\
    .addGrid(gbtr.maxIter, [30, 50, 70])\
    .build()



In [111]:
cross_val = CrossValidator(
    estimator=gbtr
    , estimatorParamMaps=param_grid
    , evaluator=evaluator
    , numFolds=3
    , seed=43
    , parallelism=2
)

In [112]:
%%time
cv_model = cross_val.fit(train)

CPU times: user 5.69 s, sys: 2.49 s, total: 8.18 s
Wall time: 5min 58s


In [113]:
def check_cv_result(cv_mode, asc=True):
    """
    CrossValidatorでcvした結果の確認
    
    Parameters
    -----
    cv_model : Transformer
        cv訓練済みのモデル
    asc : bool default True
        metricでのsortを昇順にするか降順にするか
    """
    avgMetrics = cv_model.avgMetrics
    cv_params = cv_model.getEstimatorParamMaps()

    result_list = []
    for param_dict, avg_metric in zip(cv_params, avgMetrics):
        temp_result_map = {"metric":avg_metric}
        for param, val in param_dict.items():
            temp_result_map[param.name] = val
        result_list.append(temp_result_map)

    cv_result_df = pd.DataFrame(result_list)
    cv_result_df.sort_values("metric", ascending=asc, inplace=True)
    return cv_result_df

In [114]:
check_cv_result(cv_model)

Unnamed: 0,metric,maxDepth,subsamplingRate,maxIter
5,3.712986,3,0.8,70
2,3.771435,3,0.6,70
1,3.799881,3,0.6,50
4,3.808517,3,0.8,50
8,3.809639,3,1.0,70
17,3.81099,4,1.0,70
16,3.829869,4,1.0,50
7,3.853383,3,1.0,50
0,3.875134,3,0.6,30
15,3.902165,4,1.0,30


In [115]:
#モデルのパラメータを確認
print("MaxDepth : ",  cv_model.bestModel.getMaxDepth())
print("MaxIter: ",  cv_model.bestModel.getMaxIter())
print("SubsamplingRate : ",  cv_model.bestModel.getSubsamplingRate())

MaxDepth :  3
MaxIter:  70
SubsamplingRate :  0.8


In [116]:
## cv_modelのtransformで使用されるモデルの確認
predict_train = cv_model.bestModel.transform(train)
predict_train.show()

predict_train = cv_model.transform(train)
predict_train.show()

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[0.00632,18.0,2.3...| 24.0|25.339419229140248|
|[0.0136,75.0,4.0,...| 18.9|18.439388760600504|
|[0.01951,17.5,1.3...| 33.0| 32.60494370617383|
|[0.02055,85.0,0.7...| 24.7|23.147168347766335|
|[0.02731,0.0,7.07...| 21.6| 21.02113992063794|
|[0.02763,75.0,2.9...| 30.8|29.703329089078736|
|[0.02875,28.0,15....| 25.0| 26.15991887484538|
|[0.03237,0.0,2.18...| 33.4|33.450959842061444|
|[0.03359,75.0,2.9...| 34.9| 33.55133744979404|
|[0.03584,80.0,3.3...| 23.5|24.061516679254037|
|[0.03659,25.0,4.8...| 24.8| 23.89543588674491|
|[0.04113,25.0,4.8...| 28.0| 26.96883672491584|
|[0.04203,28.0,15....| 22.9|24.812566255319606|
|[0.04294,28.0,15....| 20.6|20.401528187581203|
|[0.04337,21.0,5.6...| 20.5|21.631223424990786|
|[0.04379,80.0,3.3...| 19.4| 19.30606800056208|
|[0.04462,25.0,4.8...| 23.9| 24.47302958220844|
|[0.04684,0.0,3.41...| 22.6| 24.23347923

In [117]:
predict_test = cv_model.transform(test)
predict_test.show()

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[0.01311,90.0,1.2...| 35.4| 33.73492097814904|
|[0.01432,100.0,1....| 31.6|  31.0880588119184|
|[0.02729,0.0,7.07...| 34.7| 31.73506827962039|
|[0.02985,0.0,2.18...| 28.7| 25.78866531896098|
|[0.03551,25.0,4.8...| 22.9| 23.29028942986654|
|[0.03932,0.0,3.41...| 22.0| 24.23347923788998|
|[0.05789,12.5,6.0...| 22.0|20.436806239227504|
|[0.06899,0.0,25.6...| 22.0|21.743991165002814|
|[0.07165,0.0,25.6...| 20.3| 21.54104723388486|
|[0.07896,0.0,12.8...| 24.1|25.990756396926248|
|[0.09849,0.0,25.6...| 18.8| 20.18796682025256|
|[0.10084,0.0,10.0...| 22.8| 25.95742510153246|
|[0.10328,25.0,5.1...| 19.6| 21.37679744355977|
|[0.10793,0.0,8.56...| 21.7|21.147343865736307|
|[0.12269,0.0,6.91...| 21.2| 23.17638791806525|
|[0.12744,0.0,6.91...| 26.6|31.112976771478873|
|[0.12816,12.5,6.0...| 20.9|21.122435141822166|
|[0.13058,0.0,10.0...| 20.4|18.500978776

In [118]:
print("train : ", evaluator.evaluate(predict_train))
print("test : ", evaluator.evaluate(predict_test))

train :  1.2305235463119917
test :  3.3887344451402397


In [119]:
spark.stop()