In [1]:
import numpy as np
import pandas as pd

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("logistic_regression").getOrCreate()

In [3]:
filename = "./bank-full.csv"
data = spark.read.csv(filename, header=True, inferSchema=True, sep=";")
data.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown|  single|  unknown|     no|      1|     no|  no|unknown|  5|  may

In [4]:
data.dtypes

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('y', 'string')]

In [5]:
from pyspark.sql.functions import when, col, lit
data1 = data.withColumn("y1", when(data["y"] == "yes", lit(1)).otherwise(lit(0)))

In [6]:
data1.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y| y1|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|  0|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|  0|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|  0|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|  0|
| 33|     unknown|  single|  unknown|     no|      1|  

In [7]:
#stringの処理
from pyspark.ml.feature import StringIndexer
default_index = StringIndexer(inputCol="default", outputCol="default_index")

In [8]:
#assemble
from pyspark.ml.feature import VectorAssembler
assemble = VectorAssembler(inputCols=["age", "balance", "duration", "campaign", "previous", "default_index"],\
                          outputCol="features")

In [9]:
#normalize
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [10]:
#logistic regression
from pyspark.ml.classification import LogisticRegression
logistic_regression = LogisticRegression(featuresCol="scaled_features", labelCol="y1")

In [11]:
#pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[default_index, assemble, scaler, logistic_regression])

In [12]:
df = data1.select("age", "balance", "duration", "campaign", "previous", "default", "y", "y1")

In [13]:
#split train data and test data
train_df, test_df = df.randomSplit([0.7, 0.3], seed=123456)

In [14]:
#modeling by using train data
fit_model = pipeline.fit(train_df)

In [15]:
#confirm the result of modeling
fit_model.stages[3].coefficients

DenseVector([0.0548, 0.1078, 0.9143, -0.4013, 0.3048, -0.0537])

In [16]:
#predict the targets of train data
pred_train = fit_model.transform(train_df)

In [17]:
pred_train.show()

+---+-------+--------+--------+--------+-------+---+---+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|age|balance|duration|campaign|previous|default|  y| y1|default_index|            features|     scaled_features|       rawPrediction|         probability|prediction|
+---+-------+--------+--------+--------+-------+---+---+-------------+--------------------+--------------------+--------------------+--------------------+----------+
| 18|      5|     143|       2|       0|     no| no|  0|          0.0|[18.0,5.0,143.0,2...|[1.69618976559234...|[2.91255708620734...|[0.94846369913306...|       0.0|
| 18|     35|     104|       2|       0|     no| no|  0|          0.0|[18.0,35.0,104.0,...|[1.69618976559234...|[3.05047473111548...|[0.95480301758925...|       0.0|
| 18|    108|      92|       1|       1|     no|yes|  1|          0.0|[18.0,108.0,92.0,...|[1.69618976559234...|[2.83710186438638...|[0.94464811949149...|       0.0|
| 18

In [18]:
pred_train.select("rawPrediction", "Probability").show(truncate=False)

+----------------------------------------+-----------------------------------------+
|rawPrediction                           |Probability                              |
+----------------------------------------+-----------------------------------------+
|[2.9125570862073435,-2.9125570862073435]|[0.9484636991330693,0.05153630086693073] |
|[3.0504747311154827,-3.0504747311154827]|[0.9548030175892533,0.045196982410746656]|
|[2.83710186438638,-2.83710186438638]    |[0.9446481194914953,0.0553518805085047]  |
|[2.693356847714414,-2.693356847714414]  |[0.9366335067994203,0.06336649320057974] |
|[2.686229161120793,-2.686229161120793]  |[0.9362091513071605,0.06379084869283946] |
|[1.8605914930536533,-1.8605914930536533]|[0.8653658764825606,0.13463412351743942] |
|[2.091118036788695,-2.091118036788695]  |[0.8900368969872618,0.10996310301273815] |
|[2.3986976275717513,-2.3986976275717513]|[0.9167279371693962,0.08327206283060384] |
|[2.31910179024237,-2.31910179024237]    |[0.910446733494069,0.08

In [19]:
data1.withColumn("duration_label", when(df["duration"] >= 365, "long")\
                                    .when(df["duration"] <))

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y| y1|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|  0|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|  0|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|  0|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|  0|
| 33|     unknown|  single|  unknown|     no|      1|  