# ***Bank Marketing Project***
## Nguyen Hoang Phuong Anh 


## Outcome Report 


First of all, below are all of the library used. This project is mainly run on PySpark and its extensions. 

In [1]:
import pandas as p
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, DenseVector, SparseVector
from pyspark.sql.types import IntegerType,StringType,DoubleType
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


### Part 1: DataFrame
1. Read data in bank-marketing.csv in a DataFrame, run and show the first 20 rows 

In [2]:

#DataFrame (use basic pyspark.sql)
# 1. Load the "Bank-marketing" dataset into a DataFrame, check the schema
# and dislay the first 20 rows
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config("spark.some.config.option", "some-value") \
    .appName("Bank Marketing") \
    .getOrCreate()

df0 = spark.read.csv("/Users/hoangphuonganhnguyen/Downloads/BankMarketing.csv",header=True,inferSchema=True)

print("df0 schema is: ")
df0.printSchema()

df0.show(20)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/21 01:37:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/21 01:37:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


df0 schema is: 
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- empVarRate: double (nullable = true)
 |-- consPriceIdx: double (nullable = true)
 |-- consConfIdx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nrEmployed: double (nullable = true)
 |-- y: string (nullable = true)

+---+-----------+--------+-------------------+-------+-------+----+---------+-----+---------+--------+--------+-----+--------+-----------+----------+-

In [3]:
# 2. groupBy education
education = df0.groupBy("education").count()
education.show(20)

+-------------------+-----+
|          education|count|
+-------------------+-----+
|        high.school| 9515|
|            unknown| 1731|
|           basic.6y| 2292|
|professional.course| 5243|
|  university.degree|12168|
|         illiterate|   18|
|           basic.4y| 4176|
|           basic.9y| 6045|
+-------------------+-----+



In [4]:
# 3. col "age", most targeted range
age = df0.groupBy("age").count()
age_sort = age.orderBy("count", ascending=False)
age_sort.show(20)

+---+-----+
|age|count|
+---+-----+
| 31| 1947|
| 32| 1846|
| 33| 1833|
| 36| 1780|
| 35| 1759|
| 34| 1745|
| 30| 1714|
| 37| 1475|
| 29| 1453|
| 39| 1432|
| 38| 1407|
| 41| 1278|
| 40| 1161|
| 42| 1142|
| 45| 1103|
| 43| 1055|
| 46| 1030|
| 44| 1011|
| 28| 1001|
| 48|  979|
+---+-----+
only showing top 20 rows



In [5]:
#4 model report of selected cols
selected_cols = df0.select("age",'education','marital',"duration")

selected_cols.groupBy("education").count().show()
selected_cols.groupBy("marital").count().show()

numerical_info = selected_cols.describe("age","duration")
numerical_info.show()

marital_info = selected_cols.groupBy("marital").count().show()
education_info = selected_cols.groupBy("education").count().show()

+-------------------+-----+
|          education|count|
+-------------------+-----+
|        high.school| 9515|
|            unknown| 1731|
|           basic.6y| 2292|
|professional.course| 5243|
|  university.degree|12168|
|         illiterate|   18|
|           basic.4y| 4176|
|           basic.9y| 6045|
+-------------------+-----+

+--------+-----+
| marital|count|
+--------+-----+
| unknown|   80|
|divorced| 4612|
| married|24928|
|  single|11568|
+--------+-----+

+-------+------------------+-----------------+
|summary|               age|         duration|
+-------+------------------+-----------------+
|  count|             41188|            41188|
|   mean| 40.02406040594348|258.2850101971448|
| stddev|10.421249980934057| 259.279248836465|
|    min|                17|                0|
|    max|                98|             4918|
+-------+------------------+-----------------+

+--------+-----+
| marital|count|
+--------+-----+
| unknown|   80|
|divorced| 4612|
| married|24928|


In [6]:
from pyspark.sql.types import IntegerType,StringType,DoubleType
#5. automatically identify the features and filter them based on their types (numerical or categorical)
numerical_col = [field.name for field in df0.schema.fields
                 if isinstance(field.dataType, (IntegerType, DoubleType))]
categorical_col = [field.name for field in df0.schema.fields
                   if isinstance(field.dataType,(StringType))]
print("Numerical columns: ")
print(numerical_col)
print("Categorical columns: ")
print(categorical_col)

Numerical columns: 
['age', 'duration', 'campaign', 'pdays', 'previous', 'empVarRate', 'consPriceIdx', 'consConfIdx', 'euribor3m', 'nrEmployed']
Categorical columns: 
['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'dayOfWeek', 'poutcome', 'y']


In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
#Feature transformation (use pyspark.ml.feature)
marital_Index = StringIndexer(inputCol="marital", outputCol="maritalIndex")
df1 = marital_Index.fit(df0).transform(df0)
print("df1 marital index schema is: ")
df1.printSchema()

df1 marital index schema is: 
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- empVarRate: double (nullable = true)
 |-- consPriceIdx: double (nullable = true)
 |-- consConfIdx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nrEmployed: double (nullable = true)
 |-- y: string (nullable = true)
 |-- maritalIndex: double (nullable = false)



In [8]:
marital_vector = OneHotEncoder(inputCol="maritalIndex", outputCol="maritalVector")
df2 = marital_vector.fit(df1).transform(df1) #no maritalIndex col
print("df2 marital index schema is: ")
df2.printSchema()
#define function to measure the vector size (features' sizes)

df2 marital index schema is: 
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- empVarRate: double (nullable = true)
 |-- consPriceIdx: double (nullable = true)
 |-- consConfIdx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nrEmployed: double (nullable = true)
 |-- y: string (nullable = true)
 |-- maritalIndex: double (nullable = false)
 |-- maritalVector: vector (nullable = true)



In [9]:
from pyspark.sql.types import IntegerType,StringType,DoubleType
from pyspark.sql.functions import udf

def vector_size(x):
    if isinstance(x, (DenseVector, SparseVector)):
        return len(x.toArray())
    else:
        return None

df2 = df2.withColumn("maritalSize",vector_size_udf(df2["maritalVector"]))

print("vectors sizes of marital col are: ")
df2.select("maritalVector", "maritalSize").show(truncate=False)

NameError: name 'vector_size_udf' is not defined

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
#3. For education feature
#3. For education feature
education_Index = StringIndexer(inputCol="education", outputCol="educationIndex")
df3 = education_Index.fit(df2).transform(df2)
print("vectors sizes of education col are: ")
df3.show()

education_vector = OneHotEncoder(inputCol="educationIndex", outputCol="educationVector")
df4 = education_vector.fit(df3).transform(df3)
df4.show()

In [None]:
#4. Vector assembler
assembler = VectorAssembler(
    inputCols=["age", "maritalVector",'educationVector'],
    outputCol="vector",
)
df5 = assembler.transform(df4)
print("SHOW df5: ")
df5.select("age","maritalVector","educationVector","vector").show(truncate=False)

#5.
pipeline = Pipeline(stages=[marital_Index,education_Index,marital_vector,education_vector,assembler])
pipeline_model = pipeline.fit(df0)
df5 = pipeline_model.transform(df0)

df5.select("age","marital","education","vector").show(truncate=False)


In [None]:
#6. Y to label, using pipeline
print("Step 6: ")
label_indexer = StringIndexer(inputCol="y", outputCol="label")
df6 = label_indexer.fit(df5).transform(df5)
df6.select("y","label").show(truncate= False )

In [None]:
#7. Features col
print("Step 7: ")
numerical_features = [
    "age",
    "duration",
    "campaign",
    "maritalVector",
    "educationVector"
]
assembler7 = VectorAssembler(inputCols= numerical_features,outputCol="features")
df_features = assembler7.transform(df6)
df_features.select("age","features").show(truncate=False)


In [None]:
#Statistical Classification
print("STATISTICAL CLASSIFICATION")
#1. Logistic regression -  add on appropriate hyperparams
print("step 1: ")
logistic_regression = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.8)

pipeline = Pipeline(stages=[logistic_regression])
pipeline_model = pipeline.fit(df_features)
df_predictions = pipeline_model.transform(df_features)
df_predictions.filter(df_predictions["prediction"] == 0).select("label","features","prediction").show(truncate=False)
df_predictions.filter(df_predictions["prediction"] == 1).select("label","features","prediction").show(truncate=False)


In [None]:
# 2. Train and test the model
pipeline_model.write().overwrite().save("/Users/hoangphuonganhnguyen/Desktop/intern/TP/[PRJ] Bank marketing/logistic_regression_pipeline")
print("Pipeline model saved successfully!")


In [None]:
#3. Apply the model to the dataset to generate predictions
print("step 3: ")
df_predictions = pipeline_model.transform(df_features)
df_predictions.filter(df_predictions["prediction"] == 0).select("label","features","prediction").show(truncate=False)
df_predictions.filter(df_predictions["prediction"] == 1).select("label","features","prediction").show(truncate=False)


In [None]:
#4 Extracting the Logistic regression (classifier) from the pipeline
print("Pipeline model classifier: ")
print(type(pipeline_model.stages[-1])) #<class 'pyspark.ml.classification.LogisticRegressionModel'>
logistic_regression = LogisticRegression(featuresCol="features",
                                         labelCol="label",
                                         maxIter=10)
logistic_model = logistic_regression.fit(df_features)

#display the loss history
loss_history = logistic_model.summary.objectiveHistory
print("Loss History: ")
print(loss_history)

#Check convergence
print("Check CONVERGENCE: ")
if len(loss_history) > 1:
    print("Loss changed between iterations are: ")
    for i in range(1, len(loss_history)):
        print(f"Iteration {i}: Loss difference = {loss_history[i]-loss_history[i-1]}")
    else:
        print("Not enough iterations to check convergence")

In [None]:
#Performance and Evaluation
#1. Print AUC and assess the evaluation of of model classification
print("Step 1: ")
#Initialize the evaluator
binary_evaluator = BinaryClassificationEvaluator(labelCol="label",rawPredictionCol="rawPrediction",metricName="areaUnderROC")

#Calculate the AUC
auc = binary_evaluator.evaluate(df_predictions)
print(f"AUC: {auc}")

#Visualize ROC
roc = logistic_model.summary.roc.toPandas()
plt.figure(figsize=(8, 6))
plt.plot(roc['FPR'], roc['TPR'], label="ROC Curve", color="blue")
plt.plot([0, 1], [0, 1], "r--", label="Random Classifier")
plt.title("ROC Curve")
plt.xlabel("False Positive Rate (FPR)")
plt.ylabel("True Positive Rate (TPR)")
plt.legend(loc="lower right")
plt.grid(True)
#plt.show()


In [None]:
#2. F-measures
print("Retrieve F-Measure for different threshold: ")
binary_summary = logistic_model.summary
#extract data
precision = binary_summary.precisionByThreshold
recall = binary_summary.recallByThreshold
f1_score = binary_summary.fMeasureByThreshold
#convert to pandas df
f1_df = f1_score.toPandas()
precision_df = precision.toPandas()
recall_df = recall.toPandas()

print("F1 DataFrame:")
print(f1_df.head())
print("Precision DataFrame:")
print(precision_df.head())
print("Recall DataFrame:")
print(recall_df.head())

#merge
metrics_df = f1_df.merge(precision_df, on="threshold").merge(recall_df, on="threshold")
metrics_df.columns = ["Threshold", "F1-Score", "Precision", "Recall"]

#find the best threshold
best_row = metrics_df.loc[metrics_df["F1-Score"].idxmax()]
best_threshold = best_row["Threshold"]
best_f1 = best_row["F1-Score"]

print(f"Best F1-Score: {best_f1}")
print(f"Best threshold: {best_threshold}")

#apply threshold to the model
logistic_model.setThreshold(best_threshold)
print(f"Best classifier threshold set to: {best_threshold}")
logistic_model.write().overwrite().save("optimized_logistic_model")


In [None]:
#3. PLot the F-measure evaluation
print("F-Measure PLot: ")
thresholds = metrics_df["Threshold"]
f1_scores = metrics_df["F1-Score"]

plt.figure(figsize=(10,6))
plt.plot(thresholds, f1_scores, label="F1-Score", color="blue", linewidth=2)
plt.xlabel("Threshold", fontsize=12)
plt.ylabel("F1-Score", fontsize=12)
plt.title("F1-Score vs Threshold", fontsize=14)
plt.axvline(x=best_threshold, color='red', linestyle='--', label=f"Best Threshold: {best_threshold:.4f}")
plt.legend()
plt.grid()
plt.show()

In [None]:
#4, Train model on 5-folds approach
print("Step 4: ")
paramGrid = ParamGridBuilder()\
    .addGrid(logistic_model.regParam, [0.1, 0.3, 0.5]) \
        .addGrid(logistic_model.elasticNetParam, [0.0,0.5,1.0])\
            .build()

#set up binary classification
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="rawPrediction")
CrossValidator = CrossValidator(estimator=logistic_regression,
                                estimatorParamMaps=paramGrid,
                                evaluator=evaluator,
                                numFolds= 5)
#Perform cross-validation
cvModel = CrossValidator.fit(df_features)

#Get and evaluate the best model
bestModel = cvModel.bestModel
auc = evaluator.evaluate(bestModel.transform(df_features))
print(f"Best model after cross-validation is: {auc}")

#Best model after cross-validation is: 0.8375070526053597
#the difference between the number from step 1 and the number from step 4
