### Spark ML 線形重回帰による数値予測

* bank-fullのbalance列について数値予測する
* 特徴量は簡易化のため、数値列と文字列の'default'列のみを使用する
* 数値列は標準化を行う
* 文字列はインデックス化（ラベルエンコーディング）を行う
* 評価はRMSEにて行う

In [1]:
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, when, col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from sklearn.metrics import mean_squared_error

#### SparkSessionの作成

In [2]:
spark = SparkSession.builder.getOrCreate()

#### データの読み込み

In [3]:
data = spark.read.load('data/bank-full.csv',
                       format = 'csv',
                       sep = ';',
                       header = True,
                       inferSchema = True)

In [4]:
data.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown|  single|  unknown|     no|      1|     no|  no|unknown|  5|  may

In [5]:
data.dtypes

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('y', 'string')]

#### ①default列のインデックス化ステージ

In [6]:
default_index = StringIndexer(inputCol = 'default', outputCol = 'default_index')

#### ②特徴量のアッセンブル化ステージ

In [7]:
features = ['age', 'duration', 'campaign', 'previous', 'default_index']
assemble = VectorAssembler(inputCols = features, outputCol = 'features')

#### ③特徴量の標準化ステージ

In [8]:
scaler = StandardScaler(inputCol = 'features', outputCol = 'scaled_features')

#### ④ロジスティック回帰のインスタンス化ステージ

In [9]:
lr = LinearRegression(featuresCol = 'scaled_features', labelCol = 'balance')

#### パイプラインの登録（①～④）

In [10]:
pipeline = Pipeline(stages= [default_index, assemble, scaler, lr])

#### 訓練データ、テストデータの作成

In [11]:
df = data.select(['age', 'balance', 'duration', 'campaign', 'previous', 'default'])
train_df, test_df = df.randomSplit([0.7, 0.3], seed = 1)

#### 学習・モデルの確認

In [12]:
model = pipeline.fit(train_df)

In [13]:
# 係数
model.stages[3].coefficients

DenseVector([305.1062, 53.9093, -38.3768, 30.5633, -200.5981])

In [14]:
# 切片
model.stages[3].intercept

191.97247324503448

#### 推論

In [15]:
# 訓練データの推論
pred_train = model.transform(train_df)
pred_train.show()

+---+-------+--------+--------+--------+-------+-------------+--------------------+--------------------+-----------------+
|age|balance|duration|campaign|previous|default|default_index|            features|     scaled_features|       prediction|
+---+-------+--------+--------+--------+-------+-------------+--------------------+--------------------+-----------------+
| 18|    108|     167|       1|       0|     no|          0.0|[18.0,167.0,1.0,0...|[1.69530409029658...|731.5546261499119|
| 18|    608|     267|       1|       0|     no|          0.0|[18.0,267.0,1.0,0...|[1.69530409029658...|752.4158980570883|
| 18|   1944|     122|       3|       0|     no|          0.0|[18.0,122.0,3.0,0...|[1.69530409029658...|697.1592887093236|
| 19|      0|      72|       4|       0|     no|          0.0|[19.0,72.0,4.0,0....|[1.78948765086861...|702.9607541790599|
| 19|     56|     246|       1|       0|     no|          0.0|[19.0,246.0,1.0,0...|[1.78948765086861...|776.7710149210851|
| 19|     60|   

In [16]:
# テストデータの推論
pred_test = model.transform(test_df)
pred_test.show()

+---+-------+--------+--------+--------+-------+-------------+--------------------+--------------------+-----------------+
|age|balance|duration|campaign|previous|default|default_index|            features|     scaled_features|       prediction|
+---+-------+--------+--------+--------+-------+-------------+--------------------+--------------------+-----------------+
| 19|      0|     123|       3|       0|     no|          0.0|[19.0,123.0,3.0,0...|[1.78948765086861...|726.1038853928993|
| 19|     27|      86|      12|       0|     no|          0.0|[19.0,86.0,12.0,0...|[1.78948765086861...|605.8502719166297|
| 19|    779|     184|       4|       0|     no|          0.0|[19.0,184.0,4.0,0...|[1.78948765086861...|726.3253787150976|
| 19|   1169|     463|      18|       0|     no|          0.0|[19.0,463.0,18.0,...|[1.78948765086861...|609.4739717596086|
| 19|   1247|      94|       1|       0|     no|          0.0|[19.0,94.0,1.0,0....|[1.78948765086861...| 745.061881622177|
| 19|   1803|   

#### 精度評価

In [17]:
# 訓練データの精度評価
pred_train_pandas = pred_train.toPandas()
rmse = np.sqrt(mean_squared_error(pred_train_pandas['balance'], pred_train_pandas['prediction']))
rmse

3083.9550864640278

In [18]:
# テストデータの精度評価
pred_test_pandas = pred_test.toPandas()
rmse = np.sqrt(mean_squared_error(pred_test_pandas['balance'], pred_test_pandas['prediction']))
rmse

2873.365122582927