In [1]:
import os

In [4]:
os.chdir(r"D:\Lemonrice\virtual_demo\ProjectRun")

# Using API from Kaggle to download dataset

In [5]:
!kaggle datasets download -d princeganer/bank-telemarketing-dataset

Downloading bank-telemarketing-dataset.zip to D:\Lemonrice\virtual_demo\ProjectRun




  0%|          | 0.00/905k [00:00<?, ?B/s]
100%|##########| 905k/905k [00:02<00:00, 443kB/s]
100%|##########| 905k/905k [00:02<00:00, 443kB/s]


In [6]:
!unzip bank-telemarketing-dataset.zip

Archive:  bank-telemarketing-dataset.zip
  inflating: bank-additional-full.csv  
  inflating: bank-additional-names.txt  
  inflating: bank-full.csv           
  inflating: info.txt                


In [7]:
!dir

 Volume in drive D is Storage
 Volume Serial Number is E457-CEE6

 Directory of D:\Lemonrice\virtual_demo\ProjectRun

12/03/2023  11:18    <DIR>          .
12/03/2023  11:18    <DIR>          ..
12/03/2023  11:17    <DIR>          .ipynb_checkpoints
10/03/2023  08:52         5,834,924 bank-additional-full.csv
10/03/2023  08:52             5,458 bank-additional-names.txt
10/03/2023  08:52         4,610,348 bank-full.csv
12/03/2023  11:18           927,049 bank-telemarketing-dataset.zip
10/03/2023  08:52                94 info.txt
12/03/2023  11:09            53,604 Spark_Bank_Campaign - Copy - 3_12_1.ipynb
               6 File(s)     11,431,477 bytes
               3 Dir(s)  75,674,648,576 bytes free


Sucessfully imported data from Kaggle API

## Importing libraries for further operations

In [8]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import when, count
from pyspark.sql.functions import col, sum
import pyspark.pandas as ps
import findspark
findspark.init()

ModuleNotFoundError: No module named 'seaborn'

In [None]:
# create a sparksession
spark = SparkSession.builder.master('local[4]').appName('ml').getOrCreate()

In [None]:
# display function
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [None]:
# to display maximum columns in pandas
pd.pandas.set_option('display.max_columns', None)

# Data Preprocessing

## Bank Full (dataset-1)

In [None]:
# Read Spark Dataframe
bank_full = spark.read.csv('bank-full.csv', sep = ";" , header=True, inferSchema=True)

In [None]:
bank_full.count()

In [None]:
display(bank_full.limit(5))

In [None]:
bank_full.printSchema()

In [None]:
# Adding the index columns 

new_cols = ["emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor_3m", "nr_employed"]
for column in new_cols:
    bank_full = bank_full.withColumn(column, bank_full["poutcome"] + 1)

In [None]:
display(bank_full.limit(5))

In [None]:
# map years into a dataframe

def year_mapper(data, start_yr, end_yr):
    """
        This function takes dataframe, start year of data, end year of data as input and
        returns a new dataframe having year column mapped to it.
    """
    month_lst = ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]

    # Make a copy of the original dataframe
    new_data = data.copy()

    # Insert a new "year" column filled with zeros
    new_data.insert(loc=0, column="year", value=0)

    # Set the first year to the start year
    current_year = int(start_yr)
    new_data.at[0, "year"] = current_year

    # Loop through the rows of the dataframe, updating the year column when the month changes
    for i in range(1, len(new_data)):
        # If the current month is earlier in the year than the previous month, increment the year
        if month_lst.index(new_data["month"][i]) < month_lst.index(new_data["month"][i-1]):
            current_year += 1

        new_data.at[i, "year"] = current_year

        # If the current year exceeds the end year, break out of the loop
        if current_year > end_yr:
            break

    return new_data

In [None]:
import warnings
# Use default index prevent overhead.
ps.set_option("compute.default_index_type", "distributed")

# Ignore warnings coming from Arrow optimizations.
warnings.filterwarnings("ignore")  

# To speed up dataset processing
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

bank_full_pdf = bank_full.toPandas()

# Apply the function to the Pandas DataFrame
new_pandas_df = year_mapper(bank_full_pdf, 2008, 2010)

# Convert the updated Pandas DataFrame back to a PySpark DataFrame
bank_full = spark.createDataFrame(new_pandas_df)

In [None]:
display(bank_full.limit(5))

In [None]:
bank_full.groupBy("year").count().show()

### Index mapper

In [None]:
#Adding the missing index into the dataframes because it have the same values along the months

def map_index(new_data):

    index_2008 = {"may":{"emp_var_rate":1.1, "cons_price_idx":93.994, "cons_conf_idx":-36.4, "euribor_3m":4.85, "nr_employed":5191},
                 "jun":{"emp_var_rate":1.4, "cons_price_idx":94.465, "cons_conf_idx":-41.8, "euribor_3m":4.86, "nr_employed":5228.1},
                 "jul":{"emp_var_rate":1.4, "cons_price_idx":93.918, "cons_conf_idx":-42.7, "euribor_3m":4.96, "nr_employed":5228.1},
                 "aug":{"emp_var_rate":1.4, "cons_price_idx":93.444, "cons_conf_idx":-36.1, "euribor_3m":4.965, "nr_employed":5228.1},
                 "oct":{"emp_var_rate":-0.1, "cons_price_idx":93.798, "cons_conf_idx":-40.4, "euribor_3m":5, "nr_employed":5195.8},
                 "nov":{"emp_var_rate":-0.1, "cons_price_idx":93.2, "cons_conf_idx":-42, "euribor_3m":4.406, "nr_employed":5195.8},
                 "dec":{"emp_var_rate":-0.2, "cons_price_idx":92.75, "cons_conf_idx":-45.9, "euribor_3m":3.563, "nr_employed":5176.3}}

    index_2009 = {"jan":{"emp_var_rate":-0.2, "nr_employed":5176.3},
                  "feb":{"emp_var_rate":-0.2, "nr_employed":5176.3},
                  "mar":{"emp_var_rate":-1.8, "cons_price_idx":92.84, "cons_conf_idx":-50, "euribor_3m":1.811, "nr_employed":5099.1},
                  "apr":{"emp_var_rate":-1.8, "cons_price_idx":93.075, "cons_conf_idx":-47.1, "euribor_3m":1.498, "nr_employed":5099.1},
                  "may":{"emp_var_rate":-1.8, "cons_price_idx":92.89, "cons_conf_idx":-46.2, "euribor_3m":1.334, "nr_employed":5099.1},
                 "jun":{"emp_var_rate":-2.9, "cons_price_idx":92.963, "cons_conf_idx":-40.8, "euribor_3m":1.26, "nr_employed":5076.2},
                 "jul":{"emp_var_rate":-2.9, "cons_price_idx":93.469, "cons_conf_idx":-33.6, "euribor_3m":1.072, "nr_employed":5076.2},
                 "aug":{"emp_var_rate":-2.9, "cons_price_idx":92.201, "cons_conf_idx":-31.4, "euribor_3m":0.884, "nr_employed":5076.2},
                 "sep":{"emp_var_rate":-3.4, "cons_price_idx":92.379, "cons_conf_idx":-29.8, "euribor_3m":0.813, "nr_employed":5017.5},
                 "oct":{"emp_var_rate":-3.4, "cons_price_idx":92.431, "cons_conf_idx":-26.9, "euribor_3m":0.754, "nr_employed":5017.5},
                 "nov":{"emp_var_rate":-3.4, "cons_price_idx":92.649, "cons_conf_idx":-30.1, "euribor_3m":0.722, "nr_employed":5017.5},
                 "dec":{"emp_var_rate":-3.0, "cons_price_idx":92.713, "cons_conf_idx":-33, "euribor_3m":0.718, "nr_employed":5023.5}}

    index_2010 = {"jan":{"emp_var_rate":-3.0, "nr_employed":5023.5},
                  "feb":{"emp_var_rate":-3.0, "nr_employed":5023.5},
                   "mar":{"emp_var_rate":-1.8, "cons_price_idx":92.369, "cons_conf_idx":-34.8, "euribor_3m":0.655, "nr_employed":5008.7},
                  "apr":{"emp_var_rate":-1.8, "cons_price_idx":93.749, "cons_conf_idx":-34.6, "euribor_3m":0.64, "nr_employed":5008.7},
                  "may":{"emp_var_rate":-1.8, "cons_price_idx":93.876, "cons_conf_idx":-40, "euribor_3m":0.668, "nr_employed":5008.7},
                 "jun":{"emp_var_rate":-1.7, "cons_price_idx":94.055, "cons_conf_idx":-39.8, "euribor_3m":0.704, "nr_employed":4991.6},
                 "jul":{"emp_var_rate":-1.7, "cons_price_idx":94.215, "cons_conf_idx":-40.3, "euribor_3m":0.79, "nr_employed":4991.6},
                 "aug":{"emp_var_rate":-1.7, "cons_price_idx":94.027, "cons_conf_idx":-38.3, "euribor_3m":0.898, "nr_employed":4991.6},
                 "sep":{"emp_var_rate":-1.1, "cons_price_idx":94.199, "cons_conf_idx":-37.5, "euribor_3m":0.886, "nr_employed":4963.6},
                 "oct":{"emp_var_rate":-1.1, "cons_price_idx":94.601, "cons_conf_idx":-49.5, "euribor_3m":0.959, "nr_employed":4963.6},
                 "nov":{"emp_var_rate":-1.1, "cons_price_idx":94.767, "cons_conf_idx":-50.8, "euribor_3m":1.05, "nr_employed":4963.6}}

    indx = [index_2008, index_2009, index_2010]
    years = [2008, 2009, 2010]
    
    for i in range(len(years)):
        for months, indexes in indx[i].items():
            for index, index_val in indexes.items():
                new_data = new_data.withColumn(index, 
                    when((col('year') == years[i]) & (col('month') == months), index_val).otherwise(col(index))) 
    return new_data

In [None]:
# Calling the index_mapper function
bank_full = map_index(new_data = bank_full)
display(bank_full.limit(5))

In [None]:
bank_full.printSchema()

In [None]:
#dropping the balance, day column
bank_full = bank_full.drop("balance", "day")

In [None]:
#Converting the dataframes into the pandas
dataframe_1 = bank_full.toPandas()

In [None]:
dataframe_1.head()

In [None]:
#dataframe_1.to_csv('dataframe_1.csv')

### Bank Full Dataset (dataset-2)

In [None]:
# Read Spark Dataframe
bank_add_full = spark.read.csv('bank-additional-full.csv', sep=";", header=True, inferSchema=True)

In [None]:
bank_add_full.count()

In [None]:
display(bank_add_full.limit(5))

In [None]:
# Use default index prevent overhead.
ps.set_option("compute.default_index_type", "distributed")

warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

# To speed up dataset processing
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

bank_full_pdf = bank_add_full.toPandas()

# Apply the function to the Pandas DataFrame
new_pandas_df = year_mapper(bank_full_pdf, 2008, 2010)

# Convert the updated Pandas DataFrame back to a PySpark DataFrame
bank_add_full = spark.createDataFrame(new_pandas_df)

### replace values from 999 to -1

In [None]:
#changes the values from 999 to -1
bank_add_full = bank_add_full.withColumn("pdays", when(col("pdays") == 999, -1).otherwise(col("pdays")))

### Renaming columns names and values

In [None]:
#Replacing the columns names
old_col_list = ["emp.var.rate", "cons.price.idx", "cons.conf.idx", "euribor3m", "nr.employed"]
for i in range(0, len(old_col_list)):
    bank_add_full = bank_add_full.withColumnRenamed(old_col_list[i], new_cols[i])

In [None]:
#Renaming the categories names from education columns
old_edu = ["basic.4y", "high.school", "basic.6y", "basic.9y", "university.degree", "professional.course"]
new_edu = ["basic_4y", "high_school", "basic_6y", "basic_9y", "university_degree" ,"professional_course"]

for i in range(0,6):
    bank_add_full = bank_add_full.withColumn("education", when(col("education") == old_edu[i], new_edu[i]).otherwise(col("education")))

In [None]:
display(bank_add_full.limit(5))

In [None]:
bank_full.printSchema()

In [None]:
#dropping the column day_of week
bank_add_full = bank_add_full.drop("day_of_week")

In [None]:
dataframe_2 = bank_add_full.toPandas()

In [None]:
#dataframe_2.to_csv('dataframe_2.csv')

## Concat two dataframes

In [None]:
#Concating two dataframes
frames  = [dataframe_1, dataframe_2]

bank = pd.concat(frames)

In [None]:
bank.head()

In [None]:
#bank.to_csv('final_data.csv')
bank.to_parquet("bank_data_pdf.parquet")

In [None]:
# read the parquet file
bank_data = spark.read.parquet('bank_data_pdf.parquet')
#bank_data = spark.read.csv('final_data.csv',header=True,inferSchema=True)

In [None]:
#dropping id
bank_data = bank_data.drop("_c0")

In [None]:
display(bank_data.limit(5))

In [None]:
bank_data.printSchema()

In [None]:
bank_data.count()

In [None]:
#prints the summary of dataframes with std, means and quartiles
bank_data.summary()

In [None]:
#seperating the continuous and categorical variables
cat_col = ["job","marital","education","default","housing","loan","contact","month","year","y"]
cont_col = ["age","duration","campaign","pdays","previous","emp_var_rate","cons_price_idx","cons_conf_idx","euribor_3m","nr_employed"]
categories = bank_data.select(cat_col)
continuous = bank_data.select(cont_col)

### Unique value counts

In [None]:
#Prints the value counts for categrical columns
for columns in categories:
    print("Column Name", columns)
    print("-----------------------")
    counts = bank_data.groupBy(columns).count()
    counts.show()
    print("     ")
    print("******************************************************")
    print("     ")

## Data Preparation

In [None]:
#Rename .admin category to admin
bank_data = bank_data.withColumn("job", when(col("job") == "admin.", "admin").otherwise(col("job")))

In [None]:
#Replacing "unknown" and "nonexistent" with the null values
for column in bank_data.columns:
    bank_data = bank_data.withColumn(column, when(col(column).isin("unknown", "nonexistent"), None).otherwise(col(column)))

In [None]:
display(bank_data.limit(5))

### Checking for null values

In [None]:
# Checks the null values for categorical values
bank_data.agg(*[count(when(col(c).isNull(), c)).alias(c) for c in categories.columns]).show()

In [None]:
# Checks the null values for continuous values
bank_data.agg(*[count(when(col(c).isNull(), c)).alias(c) for c in continuous.columns]).show()

### Replacing continue variables

In [None]:
from pyspark.sql.functions import mean

# calculate the mean of non-null values in columns
mean_dict = bank_data.select(*(mean(c).alias(c) for c in cont_col)).first().asDict()

# replace null values with the mean
bank_data = bank_data.fillna(mean_dict)

In [None]:
#checking for null values
bank_data.agg(*[count(when(col(c).isNull(), c)).alias(c) for c in continuous.columns]).show()

### Replacing categorical variables

In [None]:
bank_data = bank_data.drop("poutcome")

In [None]:
# calculate the mode of non-null values and replaced in columns
from pyspark.sql.functions import desc
for column in cat_col:
    mode = bank_data.groupBy(column).agg(count("*").alias("count")).orderBy(desc("count")).select(column).first()[0]
    bank_data = bank_data.fillna({column: mode})

In [None]:
#checking for null values
bank_data.agg(*[count(when(col(c).isNull(), c)).alias(c) for c in categories.columns]).show()

In [None]:
pdf=bank_data.toPandas()

### Heatmap

In [None]:
correlation = pdf.corr()
plt.figure(figsize=(10,10))
sns.heatmap(correlation)

In [None]:
# function to plot the normal distribution
def plot_dist():
    plt.figure(figsize=(10,10))
    plt.subplot(3,2,1)
    plt.hist(pdf["age"], bins = 40)
    plt.title('age')

    plt.subplot(3,2,2)
    plt.hist(pdf["duration"], bins = 40)
    plt.title('duration')

    plt.subplot(3,2,3)
    plt.hist(pdf["campaign"], bins = 40)
    plt.title('campaign')
    
    plt.subplot(3,2,4)
    plt.hist(pdf["pdays"], bins = 40)
    plt.title('pdays')
         
plot_dist()

## ordinal data encoding

In [None]:
# Creating a dictionary for converting categorical textual data entries
# conversion into categorical numeric values on basis on job profile
job_dict = {"entrepreneur":11, "self-employed":10, "admin":9, "management":8, "services":7, 
       "technician":6, "blue-collar":5, "housemaid":4, "retired":3, "student":2, "unemployed":1}

for key, value in job_dict.items():
    bank_data = bank_data.withColumn("job", when(bank_data["job"] == key, int(value)).otherwise(bank_data["job"]))

In [None]:
# conversion into categorical numeric values on basis on marital status
marital_dict = {"married":3, "single":2, "divorced":1}

for key, value in marital_dict.items():
    bank_data = bank_data.withColumn("marital", when(bank_data["marital"] == key, value).otherwise(bank_data["marital"]))

In [None]:
# conversion into categorical numeric values on basis on education
edu_dict = {"professional_course":10, "university_degree":9, "tertiary":8, "secondary":7, 
       "high_school":6, "basic_9y":5, "basic_6y":4, "primary":3, "basic_4y":2, "illiterate":1}

for key, value in edu_dict.items():
    bank_data = bank_data.withColumn("education", when(bank_data["education"] == key, value).otherwise(bank_data["education"]))

In [None]:
y_dict = {"yes":1, "no":0}

for key, value in y_dict.items():
    bank_data = bank_data.withColumn("y", when(bank_data["y"] == key, value).otherwise(bank_data["y"]))

In [None]:
display(bank_data.limit(5))

In [None]:
#Conversion of months into the quarters
quarter_dict = {"jan":"Q1", "feb":"Q1", "mar":"Q1", "apr":"Q2", "may":"Q2", "jun":"Q2", 
                "jul":"Q3", "aug":"Q3", "sep":"Q3", "oct":"Q4", "nov":"Q4", "dec":"Q4"}

for key, value in quarter_dict.items():
    bank_data = bank_data.withColumn("month", when(bank_data["month"] == key, value).otherwise(bank_data["month"]))

In [None]:
bank_data.printSchema()

In [None]:
bank = bank_data

### one hot encoding

In [None]:
#One hot encoding on the nominal data
one_hot_cols = ["default", "housing", "loan"]

for i in one_hot_cols:
    bank = bank.withColumn(i, when(col(i) == "yes", 1).
                              when(col(i) == "no", 0).
                              otherwise(col(i)))
    
bank = bank.withColumn("contact", when(col("contact") == "telephone", 1)
                                  .when(col("contact") == "cellular", 0)
                                  .otherwise(col("contact")))

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='month', outputCol='class_numeric')
indexer_fitted = indexer.fit(bank)
df_indexed = indexer_fitted.transform(bank)

from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['class_numeric'], outputCols=['class_onehot'])
df_onehot = encoder.fit(df_indexed).transform(df_indexed)

from pyspark.ml.functions import vector_to_array
df_col_onehot = df_onehot.select('*', vector_to_array('class_onehot').alias('Quarter'))

import pyspark.sql.functions as F
num_categories = len(df_col_onehot.first()['Quarter'])
cols_expanded = [(F.col('Quarter')[i].alias(f'{indexer_fitted.labels[i]}')) for i in range(num_categories)]
bank_df = df_col_onehot.select("year","age","job","marital","education","default","housing","loan","contact",
                                      "month","duration","campaign","pdays","previous","emp_var_rate","cons_price_idx",
                                      "cons_conf_idx","euribor_3m","nr_employed","y",*cols_expanded)


In [None]:
display(bank_df.limit(5))

In [None]:
bank_df.printSchema()

In [None]:
bank_data = bank_df

In [None]:
bank_data = bank_data.drop("month")

### Converting string datatype to double

In [None]:
column_types = bank_data.dtypes
# Filter the list to only include the string datatype columns

string_columns = [column[0] for column in column_types if column[1] == "string"]
print(string_columns)

In [None]:
for cols in string_columns:
# Change the datatype of the columns to double
    bank_data = bank_data.withColumn(cols, bank_data[cols].cast("double"))

In [None]:
display(bank_data.limit(5))

In [None]:
bank_data.printSchema()

### Outliers visualization and removal

In [None]:
out = bank_data.toPandas()

In [None]:
outliers_columns = ["age","duration","campaign","pdays","previous"]

In [None]:
#Function defination to plot the outliers
def plot_box():
    plt.figure(figsize=(10,10))
    plt.subplot(3,2,1)
    out.boxplot(column=["age"])

    plt.subplot(3,2,2)
    out.boxplot(column=["duration"])

    plt.subplot(3,2,3)
    out.boxplot(column=["campaign"])

    plt.subplot(3,2,4)
    out.boxplot(column=["pdays"])

    plt.subplot(3,2,5)
    out.boxplot(column=["previous"])
           
plot_box()

In [None]:
#Removing the outliers with the help of maximum binding limit
max_out_limit = []
for cols in outliers_columns:
    quantiles = bank_data.approxQuantile(cols, [0.25, 0.5, 0.75], 0.01)
    
    q3 = quantiles[2]
    q1 = quantiles[0]
    iqr = q3 - q1
    iqr = iqr*1.5
    max_limit = q3 + iqr
    min_limit = q1 - iqr
    max_out_limit.append(max_limit)
    
    print(cols, "max_limit: ",max_limit,"      min_limit: ",min_limit)
else:
    print("------------------------------------------")
    print(max_out_limit)



In [None]:
for i, j in zip(outliers_columns, max_out_limit):
    bank_data = bank_data.withColumn(i, 
                    when((col(i) >= j), j).otherwise(col(i))) 

In [None]:
out = bank_data.toPandas()

In [None]:
plot_box()

In [None]:
bank_data.printSchema()

## pyspark model Building

In [None]:
bank = bank_data

In [None]:
display(bank.limit(5))

## Data Oversampling of "yes" label in y

In [None]:
# Data oversampling function of the "yes" lebel in the dependent column
def over_sample( data, oversampling_ratio ):
    import math

    # avoid changing the original object accidentally
    new_data = data.toPandas()
    

    nums = new_data['y'].value_counts()
    num_n = nums[0]
    num_y = nums[1]
    nums = len(new_data["y"])

    new_y = ( (1 - oversampling_ratio) / oversampling_ratio)*num_n
   
    loop_num = int( math.ceil(new_y/num_y) )

    new_df = new_data[ new_data["y"] == 1.0 ]
   
    for i in range(0, loop_num-1):
       
        # randomly select all rows from new_df
        random_rows = new_df.sample(n = num_y, replace=True, random_state = 14)
        
        # append the selected row to bank_df
        new_data = new_data.append(random_rows, ignore_index=True)
       
    new_data = spark.createDataFrame(new_data)
    return new_data

In [None]:
bank_oversampled_df = over_sample(data = bank, oversampling_ratio = 0.6)

In [None]:
# Value counts for y column
bank_oversampled_df.groupBy("y").count().show()

In [None]:
bank = bank_oversampled_df

## Data Scaling

In [None]:
#Min_max scaling to the selected columns
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler


numerical_cols = ['year', 'age', 'job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'duration', 
                  'emp_var_rate', 'cons_price_idx', 'cons_conf_idx', 'euribor_3m', 
                  'nr_employed', 'Q2', 'Q3', 'Q4']

#numerical_cols = ['year', 'age', 'job', 'marital', 'education', 'default', 'housing', 'loan', 'duration', 
                   #'cons_price_idx', 'Q4']


# Create a vector assembler to combine the numerical columns into a single vector
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")

# Transform the DataFrame to create the numerical features vector
bank = assembler.transform(bank)

# Apply MinMaxScaler to the numerical features vector
scaler = MinMaxScaler(inputCol="numerical_features", outputCol="scaled_numerical_features")
scaler_model = scaler.fit(bank)
df = scaler_model.transform(bank)

df = df.drop("numerical_features")

# Drop the original numerical columns and keep only the scaled numerical features
bank = df.drop(*numerical_cols).withColumnRenamed("scaled_numerical_features", "sc_features")


In [None]:
display(bank.limit(5))

In [None]:
banks = bank

In [None]:
display(banks.limit(5))

In [None]:
#getting the names of columns of the dataframe
feature = []
for columns in banks.columns:
    feature.append(columns)
else:
    print(feature)

In [None]:
banks.count()

In [None]:
#train test split: train data percentage: 70% and test data percentage: 30%
train_data, test_data = banks.randomSplit([0.70, 0.30], seed = 14)
display(train_data.limit(5))

In [None]:
# train data counts
train_data.count()

In [None]:
# test data counts
test_data.count()

## Decision Tree Classifier

In [None]:
#Decision tree Algorithm
from pyspark.ml.classification import DecisionTreeClassifier as dtc
from sklearn.metrics import ConfusionMatrixDisplay, accuracy_score, classification_report

In [None]:
dt = dtc(labelCol="y",featuresCol='sc_features')
dt_model = dt.fit(train_data)

#Predict the values on test data
dt_predictions = dt_model.transform(test_data)

In [None]:
dt_pred = dt_predictions.select('prediction').toPandas()
actual = dt_predictions.select('y').toPandas()

#Shows confusion Report
ConfusionMatrixDisplay.from_predictions(actual, dt_pred)

In [None]:
#Prints Classification Report
print(classification_report(actual, dt_pred))

## Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression(featuresCol = 'sc_features',
                        labelCol = 'y',
                        maxIter=1000)
lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)

In [None]:
lr_pred = lr_predictions.select('prediction').toPandas()
actual = lr_predictions.select('y').toPandas()

ConfusionMatrixDisplay.from_predictions(actual, lr_pred)

In [None]:
print(classification_report(actual, lr_pred))

## Support Vector Machines

In [None]:
from pyspark.ml.classification import LinearSVC

# Load training data
lsvc = LinearSVC(featuresCol = 'sc_features', labelCol = 'y', maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(train_data)

svc_predictions = lsvcModel.transform(test_data)

svc_pred = svc_predictions.select('prediction').toPandas()
actual = svc_predictions.select('y').toPandas()

ConfusionMatrixDisplay.from_predictions(actual, svc_pred)

print(classification_report(actual, svc_pred))

## Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='y', featuresCol = 'sc_features')
rf_model = rf.fit(train_data)
predictions = rf_model.transform(test_data)

In [None]:
rf_prediction = rf_model.transform(test_data)
rf_preds = rf_prediction.select('prediction').toPandas()
actual = rf_prediction.select('y').toPandas()

ConfusionMatrixDisplay.from_predictions(actual, rf_preds)

In [None]:
print(classification_report(actual, rf_preds))

## Grid Search CV

### Grid search_CV for Random forest

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col


# Define parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [2, 5, 10]) \
    .addGrid(rf.numTrees, [20, 50, 100]) \
    .build()

# Define evaluator
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol=lr.getLabelCol())

# Define cross-validator
cv = CrossValidator(estimator=rf, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Fit cross-validator to training data
cv_model = cv.fit(train_data)

# Evaluate model on test data
gcv_pred = cv_model.transform(test_data)
evaluator.evaluate(predictions)

In [None]:
gcv_preds = gcv_pred.select('prediction').toPandas()
gcv_actual = gcv_pred.select('y').toPandas()

ConfusionMatrixDisplay.from_predictions(gcv_actual, gcv_preds)

In [None]:
print(classification_report(gcv_actual, gcv_preds))

### Grid search_CV for Decision Tree

In [None]:
param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [2, 5, 10]) \
    .addGrid(dt.maxBins, [16, 32, 64]) \
    .build()

# Define evaluator
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol=dt.getLabelCol())

# Define cross-validator
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Fit cross-validator to training data
cv_model = cv.fit(train_data)

# Evaluate model on test data
gcv_dt_pred = cv_model.transform(test_data)
evaluator.evaluate(predictions)

In [None]:
gcv_preds = gcv_dt_pred.select('prediction').toPandas()
gcv_actual = gcv_dt_pred.select('y').toPandas()

ConfusionMatrixDisplay.from_predictions(gcv_actual, gcv_preds)

In [None]:
print(classification_report(gcv_actual, gcv_preds))