In [38]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import plotly.express as px 
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [2]:
spark = SparkSession.builder\
                .appName('BankProject')\
                .getOrCreate()

data = spark.read.csv('bank.csv', header=True, inferSchema=True)

data.show(truncate=False)


24/11/13 10:17:17 WARN Utils: Your hostname, MavisdeMacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.174 instead (on interface en0)
24/11/13 10:17:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/13 10:17:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|job        |marital |education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|59 |admin.     |married |secondary|no     |2343   |yes    |no  |unknown|5  |may  |1042    |1       |-1   |0       |unknown |yes    |
|56 |admin.     |married |secondary|no     |45     |no     |no  |unknown|5  |may  |1467    |1       |-1   |0       |unknown |yes    |
|41 |technician |married |secondary|no     |1270   |yes    |no  |unknown|5  |may  |1389    |1       |-1   |0       |unknown |yes    |
|55 |services   |married |secondary|no     |2476   |yes    |no  |unknown|5  |may  |579     |1       |-1   |0       |unknown |yes    |
|54 |admin.     |married |tertiary |no     |184    |no     |no

In [3]:
data.printSchema()
data.count()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



11162

In [32]:
data_pandas = data.toPandas()

In [72]:
class EnhacedEDA:
    def __init__(self, data):
        self.data = data

    def overview(self):
        print(f'Rows: {self.data.shape[0]}, Columns: {self.data.shape[1]}')

    def plot_distribute(self, column):
        fig = px.histogram(self.data, x=column, title=column, color='deposit')
        fig.show()

    def detail_analysis(self):
        for col in self.data.columns:
            if self.data[col].dtype == 'object':
                col_type = 'categorical'
                unique_vals = self.data[col].nunique()
                print(f'\nAnalyzing Column: {col.upper()} ({col_type})')
                print(f'{col} has {unique_vals} unique values.')
            else:
                col_type = 'numerical'
                min_val = self.data[col].min()
                max_val = self.data[col].max()
                mean_val = self.data[col].mean()
                std_val = self.data[col].std()
                print(f'Min: {min_val}, Max: {max_val}, Mean: {mean_val}, Std Dev: {std_val}')

            self.plot_distribute(col)

    def run(self):
        self.overview()
        self.detail_analysis()

eda = EnhacedEDA(data_pandas)
eda.run()

Rows: 11162, Columns: 17
Min: 18, Max: 95, Mean: 41.231947679627304, Std Dev: 11.913369192215526



Analyzing Column: JOB (categorical)
job has 12 unique values.



Analyzing Column: MARITAL (categorical)
marital has 3 unique values.



Analyzing Column: EDUCATION (categorical)
education has 4 unique values.



Analyzing Column: DEFAULT (categorical)
default has 2 unique values.


Min: -6847, Max: 81204, Mean: 1528.5385235620856, Std Dev: 3225.413325946151



Analyzing Column: HOUSING (categorical)
housing has 2 unique values.



Analyzing Column: LOAN (categorical)
loan has 2 unique values.



Analyzing Column: CONTACT (categorical)
contact has 3 unique values.


Min: 1, Max: 31, Mean: 15.658036194230425, Std Dev: 8.420739541006451



Analyzing Column: MONTH (categorical)
month has 12 unique values.


Min: 2, Max: 3881, Mean: 371.99381831213043, Std Dev: 347.12838571630584


Min: 1, Max: 63, Mean: 2.508421429851281, Std Dev: 2.722077181661486


Min: -1, Max: 854, Mean: 51.33040673714388, Std Dev: 108.75828197197696


Min: 0, Max: 58, Mean: 0.8325568894463358, Std Dev: 2.2920072186705047



Analyzing Column: POUTCOME (categorical)
poutcome has 4 unique values.



Analyzing Column: DEPOSIT (categorical)
deposit has 2 unique values.


In [77]:
px.histogram(data_pandas, x='age', color='marital', opacity=0.75, barmode='overlay')

In [84]:
px.box(data_pandas, 
       y='balance', 
       x='job', 
       category_orders={'job':['admin.', 'services', 'blue-collar', 'technician', 'management', 'entrepreneur', 'self-employed', 'housemaid', 'unemployed','retired','student', 'unknown']})

In [82]:
px.box(data_pandas, x='education', y='balance', category_orders={'education':['primary', 'secondary', 'tertiary', 'unknown']})

In [4]:
data.rdd.isEmpty()

                                                                                

False

In [6]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=23)

indexer = StringIndexer(
                inputCols=['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome', 'deposit'],
                outputCols=['job_idx', 'marital_idx', 'education_idx', 'default_idx', 'housing_idx', 'loan_idx', 'contact_idx', 'month_idx', 'poutcome_idx', 'label'])

onehot = OneHotEncoder(
    inputCols=['job_idx', 'marital_idx', 'education_idx', 'default_idx', 'housing_idx', 'loan_idx', 'contact_idx', 'month_idx', 'poutcome_idx'],
    outputCols=['job_dummy', 'marital_dummy', 'education_dummy', 'default_dummy', 'housing_dummy', 'loan_dummy', 'contact_dummy', 'month_dummy', 'poutcome_dummy'])

assembler = VectorAssembler(
    inputCols=['age', 'job_dummy', 'marital_dummy', 'education_dummy', 'default_dummy', 'balance', 'housing_dummy', 'loan_dummy', 'contact_dummy', 'day', 'month_dummy', 'duration', 'campaign', 'pdays', 'previous', 'poutcome_dummy'],
    outputCol='features')

rf = RandomForestClassifier(featuresCol='features', labelCol='label')

pipeline = Pipeline(stages=[indexer, onehot, assembler, rf])


In [16]:
params = ParamGridBuilder()\
        .addGrid(rf.numTrees, [10, 20, 30])\
        .addGrid(rf.maxDepth, [5, 10 ,15])\
        .build()

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=MulticlassClassificationEvaluator(labelCol='label', metricName='f1'),
                    numFolds=5, seed=42)

cv_model = cv.fit(train_data)

24/11/13 10:48:32 WARN DAGScheduler: Broadcasting large task binary with size 1201.8 KiB
24/11/13 10:48:32 WARN DAGScheduler: Broadcasting large task binary with size 1444.0 KiB
24/11/13 10:48:32 WARN DAGScheduler: Broadcasting large task binary with size 1680.5 KiB
24/11/13 10:48:33 WARN DAGScheduler: Broadcasting large task binary with size 1912.4 KiB
24/11/13 10:48:33 WARN DAGScheduler: Broadcasting large task binary with size 1259.4 KiB
24/11/13 10:48:36 WARN DAGScheduler: Broadcasting large task binary with size 1227.4 KiB
24/11/13 10:48:39 WARN DAGScheduler: Broadcasting large task binary with size 1227.4 KiB
24/11/13 10:48:39 WARN DAGScheduler: Broadcasting large task binary with size 1612.6 KiB
24/11/13 10:48:39 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/11/13 10:48:40 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/13 10:48:40 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/11/13 10:48:41 WARN DAGSche

In [17]:
best_rf_model = cv_model.bestModel.stages[-1]
importances = best_rf_model.featureImportances
feature_list = ['age', 'job_dummy', 'marital_dummy', 'education_dummy', 'default_dummy', 'balance', 'housing_dummy', 'loan_dummy', 'contact_dummy', 'day', 'month_dummy', 'duration', 'campaign', 'pdays', 'previous', 'poutcome_dummy']

print('Featrue Importances')
for feature, importance in zip(feature_list, importances):
    print(f'{feature}: {importance:.4f}')

Featrue Importances
age: 0.0633
job_dummy: 0.0059
marital_dummy: 0.0083
education_dummy: 0.0058
default_dummy: 0.0052
balance: 0.0043
housing_dummy: 0.0046
loan_dummy: 0.0026
contact_dummy: 0.0049
day: 0.0028
month_dummy: 0.0033
duration: 0.0024
campaign: 0.0086
pdays: 0.0075
previous: 0.0063
poutcome_dummy: 0.0083


In [19]:
predictions = cv_model.transform(test_data)

evaluate = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')
f1 = evaluate.evaluate(predictions)
print('Test F1: {:.2f}'.format(f1))

24/11/13 11:44:22 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB


Test F1: 0.86
