# Sparks ML lib implementation on Bank Marketing dataset

In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ac1b340f445ca6a0873d90bcadeb46f407e27f5b1b3d765441e26c9a37440bf7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ML_lib").getOrCreate()


In [4]:
df = spark.read.csv('bank.csv')
df.show()

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|_c0|        _c1|     _c2|      _c3|    _c4|    _c5|    _c6| _c7|    _c8|_c9| _c10|    _c11|    _c12| _c13|    _c14|    _c15|   _c16|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
| 59|     admin.| married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|
| 56|     admin.| married|secondary|     no|     45|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|    yes|
| 41| technician| married|secondary|     no|   1270|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|    yes|
| 55|   services| married|secondary|     no|   2476|    yes|  

In [5]:
labels = ['age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'day', 'month', 'duration', 'campaign', 'pdays', 'previous','poutcome','deposit']

In [6]:
df = df.toDF(*labels)

In [7]:
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- campaign: string (nullable = true)
 |-- pdays: string (nullable = true)
 |-- previous: string (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [8]:
from pyspark.sql.functions import *
new_df = df.withColumn('age', col('age').cast('integer'))

In [9]:
new_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- campaign: string (nullable = true)
 |-- pdays: string (nullable = true)
 |-- previous: string (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [10]:
for i in ['age', 'balance', 'day', 'campaign', 'pdays','previous','duration']:
  new_df = new_df.withColumn(i, col(i).cast('integer'))

In [11]:
new_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [12]:
new_df.show()

+----+-----------+--------+---------+-------+-------+-------+----+-------+----+-----+--------+--------+-----+--------+--------+-------+
| age|        job| marital|education|default|balance|housing|loan|contact| day|month|duration|campaign|pdays|previous|poutcome|deposit|
+----+-----------+--------+---------+-------+-------+-------+----+-------+----+-----+--------+--------+-----+--------+--------+-------+
|NULL|        job| marital|education|default|   NULL|housing|loan|contact|NULL|month|    NULL|    NULL| NULL|    NULL|poutcome|deposit|
|  59|     admin.| married|secondary|     no|   2343|    yes|  no|unknown|   5|  may|    1042|       1|   -1|       0| unknown|    yes|
|  56|     admin.| married|secondary|     no|     45|     no|  no|unknown|   5|  may|    1467|       1|   -1|       0| unknown|    yes|
|  41| technician| married|secondary|     no|   1270|    yes|  no|unknown|   5|  may|    1389|       1|   -1|       0| unknown|    yes|
|  55|   services| married|secondary|     no|   

In [13]:
new_df.select([count(when( col(c).isNull() , c)).alias(c) for c in new_df.columns]).show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|  1|  0|      0|        0|      0|      1|      0|   0|      0|  1|    0|       1|       1|    1|       1|       0|      0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+



In [16]:
columns_to_check = ['age', 'balance', 'day', 'campaign', 'pdays', 'previous', 'duration']
new_df = new_df.dropna(subset=columns_to_check)

In [17]:
new_df.select([count(when( col(c).isNull() , c)).alias(c) for c in new_df.columns]).show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|      0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+



In [21]:
# Print distinct values for each column
for column in new_df.columns:
    distinct_values = new_df.select(column).distinct().rdd.flatMap(lambda x: x).collect()
    print(f"Distinct values in column '{column}': {distinct_values}")

Distinct values in column 'age': [31, 85, 65, 53, 78, 34, 81, 28, 76, 26, 27, 44, 22, 93, 47, 52, 86, 40, 20, 57, 54, 48, 19, 92, 64, 41, 43, 37, 61, 88, 72, 35, 59, 55, 23, 39, 49, 84, 87, 51, 69, 63, 77, 50, 45, 38, 82, 80, 25, 73, 24, 70, 62, 95, 29, 21, 60, 32, 90, 75, 56, 58, 33, 83, 68, 71, 42, 79, 30, 66, 46, 67, 18, 74, 36, 89]
Distinct values in column 'job': ['management', 'retired', 'unknown', 'self-employed', 'student', 'blue-collar', 'entrepreneur', 'admin.', 'technician', 'services', 'housemaid', 'unemployed']
Distinct values in column 'marital': ['divorced', 'married', 'single']
Distinct values in column 'education': ['unknown', 'tertiary', 'secondary', 'primary']
Distinct values in column 'default': ['no', 'yes']
Distinct values in column 'balance': [1238, 1580, 471, 1088, 148, 496, 463, 2366, 4101, 7554, 833, -125, 1342, 3794, 4519, 3918, -565, 1591, 2999, -35, 540, 623, 897, 3698, 2580, 243, 1522, 392, 858, 4158, 1084, 2235, 1025, 1395, 4929, 737, 1127, 20928, 1270, 1

In [22]:
#df.groupby('occupation').count().show()

In [None]:
#df = df.fillna(" United-States", subset = ['country'])

In [None]:
#df = df.fillna("Private", subset = ['workclass'])

In [None]:
#df = df.fillna(" Prof-specialty", subset = ['occupation'])


In [24]:
df= new_df

In [25]:
df.select([count(when( col(c).isNull() , c)).alias(c) for c in df.columns]).show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|      0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+



In [26]:
df.createOrReplaceTempView("bank")

In [33]:
df.show()

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
| 59|     admin.| married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|
| 56|     admin.| married|secondary|     no|     45|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|    yes|
| 41| technician| married|secondary|     no|   1270|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|    yes|
| 55|   services| married|secondary|     no|   2476|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| unknown|    yes|
| 54|     admin.| married| tertiary|     no|    184|     no|  

In [27]:
result  = spark.sql("select * from bank where age = 22")

In [28]:
result.show()

+---+-----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+
|age|        job|marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+-----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+
| 22|     admin.| single| tertiary|     no|    897|    yes|  no| cellular|  4|  feb|     133|       4|   -1|       0| unknown|    yes|
| 22|    student| single|secondary|     no|      0|     no|  no| cellular|  6|  feb|     229|       6|   -1|       0| unknown|    yes|
| 22|    student| single|secondary|     no|    381|     no|  no| cellular| 12|  feb|     227|       1|   -1|       0| unknown|    yes|
| 22|    student| single|secondary|     no|    107|     no|  no| cellular| 14|  apr|     125|       1|   -1|       0| unknown|    yes|
| 22|blue-collar| single|secondary|     no|    -51|    

In [29]:
categorical_cols = ['job', 'education', 'marital', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome']
numerical_cols = ['age', 'balance', 'day', 'campaign', 'pdays','previous','duration']
label = 'deposit'

In [30]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler

In [31]:
def indexer(df, col):
  indexer = StringIndexer(inputCol = col , outputCol =f'{col}_indexed' , handleInvalid = 'keep')
  indexed = indexer.fit(df).transform(df)
  return indexed

In [32]:
for col in categorical_cols:
  index_df = indexer(new_df, col)
  new_df = index_df

In [None]:
#new_df = indexer(new_df, 'country')

In [34]:
new_df.show()

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+-----------+-----------------+---------------+---------------+---------------+------------+---------------+-------------+----------------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_indexed|education_indexed|marital_indexed|default_indexed|housing_indexed|loan_indexed|contact_indexed|month_indexed|poutcome_indexed|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+-----------+-----------------+---------------+---------------+---------------+------------+---------------+-------------+----------------+
| 59|     admin.| married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|        3.0|              0.0|            0.0|     

In [35]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [36]:
indexer = [StringIndexer(inputCol = c, outputCol = f'{c}_index', handleInvalid = 'keep') for c in categorical_cols]

In [37]:
assembler = VectorAssembler(inputCols = [f'{c}_index' for c in categorical_cols ] + numerical_cols, outputCol = 'features')

In [38]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label')

In [43]:
label_indexer = StringIndexer(inputCol = 'deposit', outputCol = 'label', handleInvalid = 'keep')

In [44]:
pipeline = Pipeline(stages = indexer + [assembler , label_indexer, lr])

In [45]:
train_data , test_data = new_df.randomSplit([.8, .2])

In [46]:
model = pipeline.fit(train_data)

In [48]:
predication = model.transform(test_data)

In [49]:
predication.select('label', 'prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  0.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  1.0|       0.0|
|  0.0|       1.0|
+-----+----------+
only showing top 20 rows



In [50]:
predication.groupby("label", 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  812|
|  0.0|       1.0|  198|
|  1.0|       0.0|  241|
|  0.0|       0.0|  966|
+-----+----------+-----+



In [51]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluate = MulticlassClassificationEvaluator(predictionCol = 'prediction', labelCol = 'label', metricName = 'accuracy')

In [52]:
evaluate.evaluate(predication)

0.8019846639603068

In [53]:
predication.show()

+---+-----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+-----------+-----------------+---------------+---------------+---------------+------------+---------------+-------------+----------------+---------+---------------+-------------+-------------+-------------+----------+-------------+-----------+--------------+--------------------+-----+--------------------+--------------------+----------+
|age|        job|marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_indexed|education_indexed|marital_indexed|default_indexed|housing_indexed|loan_indexed|contact_indexed|month_indexed|poutcome_indexed|job_index|education_index|marital_index|default_index|housing_index|loan_index|contact_index|month_index|poutcome_index|            features|label|       rawPrediction|         probability|prediction|
+---+-----------+-------+---------+-------+-------+---