In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

import pandas as pd
import numpy as np

## 全データを使ったモデリング（流れの確認）
- 特徴量には数値の絡むのみ仕様する
- 標準化は行わない
- one-hot encodingは行わない
- 重複行などのデータチェックは行わない

In [2]:
spark = SparkSession.builder.appName('Data_wrangling').getOrCreate()

your 131072x1 screen size is bogus. expect trouble


23/03/11 20:00:19 WARN Utils: Your hostname, NONAME resolves to a loopback address: 127.0.1.1; using 172.18.233.170 instead (on interface eth0)
23/03/11 20:00:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/11 20:00:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data = spark.read.format("csv")\
    .option("inferSchema", "True") \
    .option("header", "True") \
    .option("sep", ";") \
    .load("./data/bank-full.csv")

In [4]:
data.summary().show()



+-------+------------------+-------+--------+---------+-------+------------------+-------+-----+--------+-----------------+-----+-----------------+------------------+------------------+------------------+--------+-----+
|summary|               age|    job| marital|education|default|           balance|housing| loan| contact|              day|month|         duration|          campaign|             pdays|          previous|poutcome|    y|
+-------+------------------+-------+--------+---------+-------+------------------+-------+-----+--------+-----------------+-----+-----------------+------------------+------------------+------------------+--------+-----+
|  count|             45211|  45211|   45211|    45211|  45211|             45211|  45211|45211|   45211|            45211|45211|            45211|             45211|             45211|             45211|   45211|45211|
|   mean| 40.93621021432837|   null|    null|     null|   null|1362.2720576850766|   null| null|    null|15.806418791886

                                                                                

In [5]:
# データ作成
linear_df = data.select(["age", "balance", "campaign"])
target = "balance"
features = ["age", "campaign"]
train_df = data.select(features)

In [6]:
# データ作成ステージ
from pyspark.ml.feature import VectorAssembler
assemble = VectorAssembler(inputCols=features, outputCol="features")

In [7]:
# 線形重回帰モデリングステージ
from pyspark.ml.regression import LinearRegression
clf = LinearRegression(featuresCol="features", labelCol="balance")

In [8]:
# パイプラインの設定：ステージの登録
from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages=[assemble, clf])
model = pipeline.fit(linear_df)

23/03/11 20:00:38 WARN Instrumentation: [726abfcc] regParam is zero, which might cause numerical instability and overfitting.
23/03/11 20:00:39 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/03/11 20:00:39 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/03/11 20:00:39 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [9]:
# パイプラインの実行
df = model.transform(linear_df)
df.show()

+---+-------+--------+----------+------------------+
|age|balance|campaign|  features|        prediction|
+---+-------+--------+----------+------------------+
| 58|   2143|       1|[58.0,1.0]|1867.1309208276969|
| 44|     29|       1|[44.0,1.0]| 1474.315799038966|
| 33|      2|       1|[33.0,1.0]| 1165.675346204963|
| 47|   1506|       1|[47.0,1.0]| 1558.490467993694|
| 33|      1|       1|[33.0,1.0]| 1165.675346204963|
| 35|    231|       1|[35.0,1.0]|1221.7917921747817|
| 28|    447|       1|[28.0,1.0]|1025.3842312804163|
| 42|      2|       1|[42.0,1.0]|1418.1993530691473|
| 58|    121|       1|[58.0,1.0]|1867.1309208276969|
| 43|    593|       1|[43.0,1.0]|1446.2575760540565|
| 41|    270|       1|[41.0,1.0]|1390.1411300842378|
| 29|    390|       1|[29.0,1.0]|1053.4424542653255|
| 53|      6|       1|[53.0,1.0]|1726.8398059031501|
| 58|     71|       1|[58.0,1.0]|1867.1309208276969|
| 57|    162|       1|[57.0,1.0]|1839.0726978427874|
| 51|    229|       1|[51.0,1.0]|1670.72335993

In [10]:
# 係数の確認
print(model.stages[1].coefficients)
# 切片の確認
print(model.stages[1].intercept)

[28.058222984909357,-14.785487706439227]
254.53947540939342


### 線形重回帰のモデリングと予測

In [11]:
train_df, test_df = data.select(["age", "balance", "campaign"]) \
                        .randomSplit([0.7, 0.3], seed = 1)

In [12]:
train_df.show()

+---+-------+--------+
|age|balance|campaign|
+---+-------+--------+
| 18|    108|       1|
| 18|    608|       1|
| 18|   1944|       3|
| 19|      0|       3|
| 19|     56|       1|
| 19|     60|       1|
| 19|     96|       3|
| 19|    103|       2|
| 19|    134|       2|
| 19|    291|       5|
| 19|    626|       1|
| 19|   5368|       6|
| 20|   -322|       4|
| 20|   -172|       3|
| 20|   -103|       1|
| 20|     53|       1|
| 20|     66|       2|
| 20|     67|       1|
| 20|     76|       2|
| 20|     79|       3|
+---+-------+--------+
only showing top 20 rows



In [13]:
train_df.count()

31684

In [14]:
# データ作成ステージ
from pyspark.ml.feature import VectorAssembler
target = "balance"
features = ["age", "campaign"]
assemble = VectorAssembler(inputCols=features, outputCol="features")

In [15]:
# 線形重回帰ステージ
from pyspark.ml.regression import LinearRegression
clf = LinearRegression(featuresCol="features", labelCol="balance")

In [16]:
# パイプライン登録
from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages=[assemble, clf])
model = pipeline.fit(train_df)

23/03/11 20:00:40 WARN Instrumentation: [04c3020a] regParam is zero, which might cause numerical instability and overfitting.


In [17]:
# 実行
pred_train = model.transform(train_df)
pred_train.show()

+---+-------+--------+----------+-----------------+
|age|balance|campaign|  features|       prediction|
+---+-------+--------+----------+-----------------+
| 18|    108|       1|[18.0,1.0]| 731.419060209178|
| 18|    608|       1|[18.0,1.0]| 731.419060209178|
| 18|   1944|       3|[18.0,3.0]|699.1132728490966|
| 19|      0|       3|[19.0,3.0]|728.1159917037521|
| 19|     56|       1|[19.0,1.0]|760.4217790638336|
| 19|     60|       1|[19.0,1.0]|760.4217790638336|
| 19|     96|       3|[19.0,3.0]|728.1159917037521|
| 19|    103|       2|[19.0,2.0]|744.2688853837928|
| 19|    134|       2|[19.0,2.0]|744.2688853837928|
| 19|    291|       5|[19.0,5.0]|695.8102043436706|
| 19|    626|       1|[19.0,1.0]|760.4217790638336|
| 19|   5368|       6|[19.0,6.0]|679.6573106636299|
| 20|   -322|       4|[20.0,4.0]|740.9658168783671|
| 20|   -172|       3|[20.0,3.0]|757.1187105584079|
| 20|   -103|       1|[20.0,1.0]|789.4244979184894|
| 20|     53|       1|[20.0,1.0]|789.4244979184894|
| 20|     66

In [22]:
# RMSE(sklearn)
from sklearn.metrics import mean_squared_error
pred_train_pandas = pred_train.toPandas()
np.sqrt(mean_squared_error(pred_train_pandas["balance"], pred_train_pandas["prediction"]))

3091.121524988566

In [25]:
# 係数
train_cols = train_df.columns
train_cols.remove(target)
pd.DataFrame(index=train_cols, data=model.stages[1].coefficients, columns=["coefficients"])

Unnamed: 0,coefficients
age,29.002719
campaign,-16.152894


In [26]:
test_df.show()

+---+-------+--------+
|age|balance|campaign|
+---+-------+--------+
| 19|      0|       4|
| 19|     27|      12|
| 19|    779|       4|
| 19|   1169|      18|
| 19|   1247|       1|
| 19|   1803|       1|
| 19|   1803|       1|
| 20|    130|       3|
| 20|    292|       1|
| 20|    292|       2|
| 20|    336|       1|
| 20|    556|      10|
| 20|   1191|       1|
| 20|   1681|       1|
| 20|   1819|       3|
| 21|   -172|       1|
| 21|      0|       5|
| 21|     64|       1|
| 21|    164|       5|
| 21|    232|       2|
+---+-------+--------+
only showing top 20 rows



In [27]:
test_df.count()

13527

In [28]:
pred_test = model.transform(test_df)
pred_test.show()

+---+-------+--------+-----------+-----------------+
|age|balance|campaign|   features|       prediction|
+---+-------+--------+-----------+-----------------+
| 19|      0|       4| [19.0,4.0]|711.9630980237114|
| 19|     27|      12|[19.0,12.0]|582.7399485833855|
| 19|    779|       4| [19.0,4.0]|711.9630980237114|
| 19|   1169|      18|[19.0,18.0]|485.8225865031411|
| 19|   1247|       1| [19.0,1.0]|760.4217790638336|
| 19|   1803|       1| [19.0,1.0]|760.4217790638336|
| 19|   1803|       1| [19.0,1.0]|760.4217790638336|
| 20|    130|       3| [20.0,3.0]|757.1187105584079|
| 20|    292|       1| [20.0,1.0]|789.4244979184894|
| 20|    292|       2| [20.0,2.0]|773.2716042384486|
| 20|    336|       1| [20.0,1.0]|789.4244979184894|
| 20|    556|      10|[20.0,10.0]|644.0484547981227|
| 20|   1191|       1| [20.0,1.0]|789.4244979184894|
| 20|   1681|       1| [20.0,1.0]|789.4244979184894|
| 20|   1819|       3| [20.0,3.0]|757.1187105584079|
| 21|   -172|       1| [21.0,1.0]|818.42721677

In [30]:
# RMSE(sklearn)
from sklearn.metrics import mean_squared_error
pred_test_pandas = pred_test.toPandas()
np.sqrt(mean_squared_error(pred_test_pandas["balance"], pred_test_pandas["prediction"]))

2881.1342329320028