In [None]:
import numpy as np # linear algebra

import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('/content/bank.csv', header = True, inferSchema = True)
df.printSchema()

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=a1e7fb1b6c31e77c07527ab399d2cb88c07d9e3da568614452d691d7e11b319a
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 

In [None]:
df = df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit')
cols = df.columns
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [None]:
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']

stages = []

for categoricalCol in categoricalColumns:

    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')

    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])

    stages += [stringIndexer, encoder]

In [None]:
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')

stages += [label_stringIdx]

numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']

assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [None]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,1.0,1.0,1.0,1.0,1.0
features,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
age,59,56,41,55,54
job,admin.,admin.,technician,services,admin.
marital,married,married,married,married,married
education,secondary,secondary,secondary,secondary,tertiary
default,no,no,no,no,no
balance,2343,45,1270,2476,184
housing,yes,no,yes,yes,no
loan,no,no,no,no,no


In [None]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 7855
Test Dataset Count: 3307


**LOGISTIC REGRESSION**

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [None]:
predictions = lrModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10,truncate=False)

+---+----------+-----+----------------------------------------+----------+-----------------------------------------+
|age|job       |label|rawPrediction                           |prediction|probability                              |
+---+----------+-----+----------------------------------------+----------+-----------------------------------------+
|33 |management|0.0  |[1.930848545181285,-1.930848545181285]  |0.0       |[0.8733433112486003,0.12665668875139968] |
|49 |management|0.0  |[1.9278369547209746,-1.9278369547209746]|0.0       |[0.8730098101343369,0.12699018986566313] |
|52 |management|0.0  |[-0.7737627890181469,0.7737627890181469]|1.0       |[0.31566570209267647,0.6843342979073235] |
|53 |management|0.0  |[0.9470813734407431,-0.9470813734407431]|0.0       |[0.7205278400217976,0.2794721599782024]  |
|58 |management|0.0  |[2.4465776469245744,-2.4465776469245744]|0.0       |[0.9203108202637693,0.07968917973623069] |
|32 |management|0.0  |[1.2838607754603748,-1.2838607754603748]|0

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()

accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")
print('Test Area Under ROC', evaluator.evaluate(predictions))

Test Accuracy: 0.89
Test Area Under ROC 0.885643189559481


**HYPERPARAMETER TUNING FOR LR**

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create a ParamGridBuilder to specify the hyperparameter values to search over
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.5, 0.75, 1.0]) \
    .addGrid(lr.maxIter, [100, 1000, 10000]) \
    .build()

# Create a CrossValidator with 5-fold cross-validation
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5,
                          parallelism=2)

In [None]:
cvModel = crossval.fit(train)
cvPreds = cvModel.transform(test)
evaluator.evaluate(cvPreds)

0.8850742738820102

**DECISION TREE CLASSIFIER**

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10,truncate=False)

+---+----------+-----+--------------+----------+----------------------------------------+
|age|job       |label|rawPrediction |prediction|probability                             |
+---+----------+-----+--------------+----------+----------------------------------------+
|33 |management|0.0  |[2498.0,481.0]|0.0       |[0.8385364216179926,0.1614635783820074] |
|49 |management|0.0  |[2498.0,481.0]|0.0       |[0.8385364216179926,0.1614635783820074] |
|52 |management|0.0  |[520.0,1931.0]|1.0       |[0.21215830273357814,0.7878416972664218]|
|53 |management|0.0  |[2498.0,481.0]|0.0       |[0.8385364216179926,0.1614635783820074] |
|58 |management|0.0  |[2498.0,481.0]|0.0       |[0.8385364216179926,0.1614635783820074] |
|32 |management|0.0  |[2498.0,481.0]|0.0       |[0.8385364216179926,0.1614635783820074] |
|57 |management|0.0  |[2498.0,481.0]|0.0       |[0.8385364216179926,0.1614635783820074] |
|52 |management|0.0  |[2498.0,481.0]|0.0       |[0.8385364216179926,0.1614635783820074] |
|46 |manag

In [None]:
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Accuracy: 0.78
Test Area Under ROC: 0.7808118726917547


**HYPERPARAMETER TUNING FOR DT**

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder() .addGrid(dt.maxDepth, [3, 5, 7]) .addGrid(dt.minInstancesPerNode, [1, 3, 5]) .build()

In [None]:
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train)
cvPreds = cvModel.transform(test)
evaluator.evaluate(cvPreds)

0.7219640623947471

**RANDOM FOREST CLASSIFIER**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+----------+-----+--------------------+----------+--------------------+
|age|       job|label|       rawPrediction|prediction|         probability|
+---+----------+-----+--------------------+----------+--------------------+
| 33|management|  0.0|[14.7159953352039...|       0.0|[0.73579976676019...|
| 49|management|  0.0|[13.6950659961960...|       0.0|[0.68475329980980...|
| 52|management|  0.0|[6.83093121490110...|       1.0|[0.34154656074505...|
| 53|management|  0.0|[12.4260051883954...|       0.0|[0.62130025941977...|
| 58|management|  0.0|[15.3587664723867...|       0.0|[0.76793832361933...|
| 32|management|  0.0|[14.3775130610017...|       0.0|[0.71887565305008...|
| 57|management|  0.0|[13.6701951931613...|       0.0|[0.68350975965806...|
| 52|management|  0.0|[17.0622005527392...|       0.0|[0.85311002763696...|
| 46|management|  0.0|[17.0903164724591...|       0.0|[0.85451582362295...|
| 31|management|  0.0|[13.5895083075096...|       0.0|[0.67947541537548...|
+---+-------

In [None]:
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Accuracy: 0.88
Test Area Under ROC: 0.8792924944133507


**HYPERPARAMETER TUNING FOR RF**

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 20]) \
    .build()

# Create a CrossValidator with 5-fold cross-validation
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5,
                          parallelism=2)

In [None]:
cvModel = crossval.fit(train)
cvPreds = cvModel.transform(test)
evaluator.evaluate(cvPreds)

0.8959136917575339

**GRADIENT-BOOSTED TREE CLASSIFIER**

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+----------+-----+--------------------+----------+--------------------+
|age|       job|label|       rawPrediction|prediction|         probability|
+---+----------+-----+--------------------+----------+--------------------+
| 33|management|  0.0|[0.94992370947246...|       0.0|[0.86987425549896...|
| 49|management|  0.0|[1.28281748604659...|       0.0|[0.92861688965145...|
| 52|management|  0.0|[-0.5021429054421...|       1.0|[0.26809961471379...|
| 53|management|  0.0|[0.09646513340517...|       0.0|[0.54808351175538...|
| 58|management|  0.0|[1.05831377292163...|       0.0|[0.89250881497055...|
| 32|management|  0.0|[0.57268875896254...|       0.0|[0.75866558918304...|
| 57|management|  0.0|[0.34932946844840...|       0.0|[0.66789037418062...|
| 52|management|  0.0|[1.33264456943866...|       0.0|[0.93494709840561...|
| 46|management|  0.0|[1.28262818068783...|       0.0|[0.92859178839697...|
| 31|management|  0.0|[1.16094954405484...|       0.0|[0.91067454521219...|
+---+-------

In [None]:
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Accuracy: 0.89
Test Area Under ROC: 0.8935091626908479


**HYPERPARAMETER TUNING FOR GBT**

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())

cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.
# This can take some minutes since it is training over 20 trees!
cvModel = cv.fit(train)
cvPreds = cvModel.transform(test)
evaluator.evaluate(cvPreds)

0.8954659543871663