In [1]:
# 全データを使ったモデリング(流れの確認)
# - とりあえず特徴量には数値のカラムのみ使用する
# - 標準化は行わない
# - one-hot encoding は行わない
# - 重複業などのデータチェックは行わない
# 手順
# 0, 簡単なデータチェック
# 1, データ作成
# 2, モデリング
# 3, 係数や制度指標の確認
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data_wrangling").getOrCreate()

In [2]:
data = spark.read.csv("./data/bank/bank-full.csv", header=True, inferSchema=True, sep=";")

In [6]:
data.summary().show()

+-------+------------------+-------+--------+---------+-------+------------------+-------+-----+--------+-----------------+-----+------------------+-----------------+------------------+------------------+--------+-----+
|summary|               age|    job| marital|education|default|           balance|housing| loan| contact|              day|month|          duration|         campaign|             pdays|          previous|poutcome|    y|
+-------+------------------+-------+--------+---------+-------+------------------+-------+-----+--------+-----------------+-----+------------------+-----------------+------------------+------------------+--------+-----+
|  count|             45211|  45211|   45211|    45211|  45211|             45211|  45211|45211|   45211|            45211|45211|             45211|            45211|             45211|             45211|   45211|45211|
|   mean| 40.93621021432837|   null|    null|     null|   null|1362.2720576850766|   null| null|    null|15.806418791886

In [7]:
# データ作成

In [9]:
linear_df = data.select(["age", "balance", "campaign"])
target = "balance"
features = ["age", "campaign"]
train_df = data.select(features)

In [10]:
features

['age', 'campaign']

In [19]:
# データ作成ステージ
from pyspark.ml.feature import VectorAssembler
assemble = VectorAssembler(inputCols=features, outputCol="features")

In [20]:
# 線形重回帰モデリングステージ
from pyspark.ml.regression import LinearRegression
clf = LinearRegression(featuresCol="features", labelCol="balance")

In [21]:
# パイプラインの設定: ステージの登録
from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages=[assemble, clf])
model = pipeline.fit(linear_df)

Exception ignored in: <function JavaWrapper.__del__ at 0x7f01e88548b0>
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LinearRegression' object has no attribute '_java_obj'
Exception ignored in: <function JavaWrapper.__del__ at 0x7f01e88548b0>
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LinearRegression' object has no attribute '_java_obj'


In [22]:
# パイプラインの実行
df = model.transform(linear_df)
df.show()

+---+-------+--------+----------+------------------+
|age|balance|campaign|  features|        prediction|
+---+-------+--------+----------+------------------+
| 58|   2143|       1|[58.0,1.0]|1867.1309208276969|
| 44|     29|       1|[44.0,1.0]| 1474.315799038966|
| 33|      2|       1|[33.0,1.0]| 1165.675346204963|
| 47|   1506|       1|[47.0,1.0]| 1558.490467993694|
| 33|      1|       1|[33.0,1.0]| 1165.675346204963|
| 35|    231|       1|[35.0,1.0]|1221.7917921747817|
| 28|    447|       1|[28.0,1.0]|1025.3842312804163|
| 42|      2|       1|[42.0,1.0]|1418.1993530691473|
| 58|    121|       1|[58.0,1.0]|1867.1309208276969|
| 43|    593|       1|[43.0,1.0]|1446.2575760540565|
| 41|    270|       1|[41.0,1.0]|1390.1411300842378|
| 29|    390|       1|[29.0,1.0]|1053.4424542653255|
| 53|      6|       1|[53.0,1.0]|1726.8398059031501|
| 58|     71|       1|[58.0,1.0]|1867.1309208276969|
| 57|    162|       1|[57.0,1.0]|1839.0726978427874|
| 51|    229|       1|[51.0,1.0]|1670.72335993

In [23]:
# 係数の確認
model.stages[1].coefficients

DenseVector([28.0582, -14.7855])

In [24]:
# 切片
model.stages[1].intercept

254.53947540939342

In [26]:
# 線形重回帰のモデリングと予測
# 学習データとテストデータを分ける。

In [27]:
train_df, test_df = data.select(["age","balance","campaign"]) \
                            .randomSplit([0.7, 0.3], seed = 1)

In [28]:
train_df.show()

+---+-------+--------+
|age|balance|campaign|
+---+-------+--------+
| 18|      3|       2|
| 18|      5|       2|
| 18|     35|       2|
| 18|    108|       1|
| 18|    156|       2|
| 18|    608|       1|
| 18|    608|       1|
| 18|   1944|       3|
| 19|      0|       3|
| 19|      0|       4|
| 19|      4|       1|
| 19|     96|       3|
| 19|    103|       2|
| 19|    103|       2|
| 19|    103|       2|
| 19|    134|       2|
| 19|    179|       3|
| 19|    291|       5|
| 19|    329|       2|
| 19|    526|       3|
+---+-------+--------+
only showing top 20 rows



In [30]:
train_df.count()

31677

In [31]:
# 学習データでモデリング
# データ作成ステージ

In [32]:
from pyspark.ml.feature import VectorAssembler
target = "balance"
features = ["age", "campaign"]
assemble = VectorAssembler(inputCols=features, outputCol="features")

In [34]:
# 線形重回帰ステージ
from pyspark.ml.regression import LinearRegression
clf = LinearRegression(featuresCol="features", labelCol="balance")

In [36]:
# パイプライン登録
from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages=[assemble, clf])
model = pipeline.fit(train_df)

In [39]:
# 実行
pred_train = model.transform(train_df)
pred_train.show()

+---+-------+--------+----------+-----------------+
|age|balance|campaign|  features|       prediction|
+---+-------+--------+----------+-----------------+
| 18|      3|       2|[18.0,2.0]|733.4646173474521|
| 18|      5|       2|[18.0,2.0]|733.4646173474521|
| 18|     35|       2|[18.0,2.0]|733.4646173474521|
| 18|    108|       1|[18.0,1.0]|749.1539074836246|
| 18|    156|       2|[18.0,2.0]|733.4646173474521|
| 18|    608|       1|[18.0,1.0]|749.1539074836246|
| 18|    608|       1|[18.0,1.0]|749.1539074836246|
| 18|   1944|       3|[18.0,3.0]|717.7753272112798|
| 19|      0|       3|[19.0,3.0]|745.0493288502088|
| 19|      0|       4|[19.0,4.0]|729.3600387140364|
| 19|      4|       1|[19.0,1.0]|776.4279091225537|
| 19|     96|       3|[19.0,3.0]|745.0493288502088|
| 19|    103|       2|[19.0,2.0]|760.7386189863812|
| 19|    103|       2|[19.0,2.0]|760.7386189863812|
| 19|    103|       2|[19.0,2.0]|760.7386189863812|
| 19|    134|       2|[19.0,2.0]|760.7386189863812|
| 19|    179

In [48]:
# RMSE (sklearn)
from sklearn.metrics import mean_squared_error
import numpy as np
import pandas as pd
pred_train_pandas = pred_train.toPandas()
np.sqrt(mean_squared_error(pred_train_pandas["balance"], pred_train_pandas["prediction"]))

2994.8092260371577

In [49]:
# 係数
train_cols = train_df.columns
train_cols.remove(target)
pd.DataFrame(index=train_cols, data=model.stages[1].coefficients, columns=["coefficients"])


Unnamed: 0,coefficients
age,27.274002
campaign,-15.68929


In [50]:
# テストデータによる予測
test_df.show()

+---+-------+--------+
|age|balance|campaign|
+---+-------+--------+
| 18|    108|       1|
| 18|    108|       1|
| 19|     27|      12|
| 19|     55|       2|
| 19|     56|       1|
| 19|     60|       1|
| 19|     88|       1|
| 19|    779|       4|
| 19|   1803|       1|
| 19|   1803|       1|
| 20|   -322|       4|
| 20|      0|       5|
| 20|     66|       2|
| 20|     67|       1|
| 20|     76|       2|
| 20|    210|       1|
| 20|    215|       4|
| 20|    336|       1|
| 20|    755|       1|
| 20|   1191|       2|
+---+-------+--------+
only showing top 20 rows



In [51]:
test_df.count()

13534

In [53]:
pred_test = model.transform(test_df)
pred_test.show()

+---+-------+--------+-----------+-----------------+
|age|balance|campaign|   features|       prediction|
+---+-------+--------+-----------+-----------------+
| 18|    108|       1| [18.0,1.0]|749.1539074836246|
| 18|    108|       1| [18.0,1.0]|749.1539074836246|
| 19|     27|      12|[19.0,12.0]|603.8457176246568|
| 19|     55|       2| [19.0,2.0]|760.7386189863812|
| 19|     56|       1| [19.0,1.0]|776.4279091225537|
| 19|     60|       1| [19.0,1.0]|776.4279091225537|
| 19|     88|       1| [19.0,1.0]|776.4279091225537|
| 19|    779|       4| [19.0,4.0]|729.3600387140364|
| 19|   1803|       1| [19.0,1.0]|776.4279091225537|
| 19|   1803|       1| [19.0,1.0]|776.4279091225537|
| 20|   -322|       4| [20.0,4.0]|756.6340403529655|
| 20|      0|       5| [20.0,5.0]| 740.944750216793|
| 20|     66|       2| [20.0,2.0]|788.0126206253103|
| 20|     67|       1| [20.0,1.0]|803.7019107614828|
| 20|     76|       2| [20.0,2.0]|788.0126206253103|
| 20|    210|       1| [20.0,1.0]|803.70191076

In [54]:
# RMSE (sklearn)
pred_test_pandas = pred_test.toPandas()
np.sqrt(mean_squared_error(pred_train_pandas["balance"], pred_train_pandas["prediction"]))

2994.8092260371577