# Customer Campaign Response Analytics Using PySpark

The dataset can be found at [kaggle](https://www.kaggle.com/datasets/nimishsawant/bankfull). The data is related with direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict if the client will subscribe a term deposit (variable y).

## Data Ingestion from CSV to Spark DataFrame

In [1]:
pip install pyspark



In [2]:
pip install -q findspark # -q, --quiet Give less output

#### Imports

In [3]:
import findspark
import pyspark
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import isnull, col, sum, from_unixtime, unix_timestamp, sin, cos
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from math import pi

In [4]:
findspark.init()

In [5]:
# Create a Spark Session
spark = SparkSession.builder.appName('Customer Campaign Response Analytics').getOrCreate()

In [6]:
# Load the dataset
file_path = '/content/bank-full.csv'

df = spark.read.csv(file_path, header=True, inferSchema=True)
# Which variables do we have?
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- Target: string (nullable = true)



In [7]:
# How does the data look like?
df.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+------+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|Target|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+------+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown|    no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown|    no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown|    no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown|    no|
| 33|     unknown| single|  unknown|     no|      1|     no|  no|unkn

In [8]:
# number of rows
df.count()

45211

Each datapoint contains information about a particular client, which was contacted during the marketing campaign mentioned at the beginning of this notebook. Most of the columns are self-explanatory, nevertheless, there are some for which an extra explanation is useful. For .the sake of completeness, we include the description of each column - taken from the [description](https://archive.ics.uci.edu/dataset/222/bank+marketing) of the dataset.

- **age**: Age
- **job**: Occupation
- **marital**: Marital Status
- **education**: Education Level
- **default**: Has credit in default?
- **balance**: Average yearly balance
- **housing**: Average has housing loan?
- **loan**: Has personal loan?
- **contact**: Contact communication type
- **day**: Last contact day of the month (In the data description it says day of the week, but wee will see below that's not the case)
- **month**: Last contact month of year
- **duration**: Last contact duration, in seconds
- **campaign**: Number of contacts performed during this campaign and for this client
- **pdays**: Number of days that passed by after the client was last contacted from a previous campaign
- **previous**: Number of contacts performed before this campaign and for this client
- **poutcome**: Outcome of the previous marketing campaign
- **Target**: Has the client subscribed a term deposit?

As its name suggests, **Target** is the target variable, which we would like to predict.

# Data Cleaning and Preprocessing

### Column renaming

In [9]:
# Do all the cleaning in a copy of the original dataframe
df_clean = df.withColumnRenamed('Target', 'y')

### Missing Values:

In [10]:
null_summary = df_clean.select(
    [sum(col(c).isNull().cast('int')).alias(c) for c in df_clean.columns]
)

null_summary.show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|  0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+



### Time Variables

In [11]:
df_clean.select('month').distinct().show()

+-----+
|month|
+-----+
|  jun|
|  aug|
|  may|
|  feb|
|  sep|
|  mar|
|  oct|
|  jul|
|  nov|
|  apr|
|  dec|
|  jan|
+-----+



All 12 months of the year are present. There is no 'year' column in the dataset and I also don't find any reference of the year(s) where the marketing campaign took place. We could assume that the campaign was run on a single year, but we can't be really sure.

Let's now take a look at the **day** column:

In [12]:
df_clean.select('day').distinct().show()

+---+
|day|
+---+
| 31|
| 28|
| 26|
| 27|
| 12|
| 22|
|  1|
| 13|
|  6|
| 16|
|  3|
| 20|
|  5|
| 19|
| 15|
|  9|
| 17|
|  4|
|  8|
| 23|
+---+
only showing top 20 rows



As mentioned on the introduction, we are dealing with day of the month instead of day of the week.

### Exploration of categorical columns

In [13]:
categorical = ['job', 'marital', 'education', 'contact', 'poutcome']
no_rows = df_clean.count()

for col_name in categorical:
  df_clean.groupBy(col_name).count() \
  .withColumn('percent_of_total', F.round((F.col('count') / no_rows)*100, 1)) \
  .show()

+-------------+-----+----------------+
|          job|count|percent_of_total|
+-------------+-----+----------------+
|   management| 9458|            20.9|
|      retired| 2264|             5.0|
|      unknown|  288|             0.6|
|self-employed| 1579|             3.5|
|      student|  938|             2.1|
|  blue-collar| 9732|            21.5|
| entrepreneur| 1487|             3.3|
|       admin.| 5171|            11.4|
|   technician| 7597|            16.8|
|     services| 4154|             9.2|
|    housemaid| 1240|             2.7|
|   unemployed| 1303|             2.9|
+-------------+-----+----------------+

+--------+-----+----------------+
| marital|count|percent_of_total|
+--------+-----+----------------+
|divorced| 5207|            11.5|
| married|27214|            60.2|
|  single|12790|            28.3|
+--------+-----+----------------+

+---------+-----+----------------+
|education|count|percent_of_total|
+---------+-----+----------------+
|  unknown| 1857|             4

As we see, **contact** and **poutcome** have many unknown values -28.8% and 81.7% respectively. For this reason we will drop out those columns

In [14]:
df_clean = df_clean.drop('contact', 'poutcome')

As of **job** and **education**, we will drop the rows where either of these columns is unknown.

In [15]:
df_clean = df_clean.where(df_clean['job'] != 'unknown') \
.where(df_clean['education'] != 'unknown')

Let's take another look at the distribution of **job** and **education** after this cleaning step:

In [16]:
no_rows = df_clean.count()

for col_name in ['job', 'education']:
  df_clean.groupBy(col_name).count() \
  .withColumn('percent_of_total', F.round((F.col('count') / no_rows)*100, 1)) \
  .show()

+-------------+-----+----------------+
|          job|count|percent_of_total|
+-------------+-----+----------------+
|   management| 9216|            21.3|
|      retired| 2145|             5.0|
|self-employed| 1540|             3.6|
|      student|  775|             1.8|
|  blue-collar| 9278|            21.5|
| entrepreneur| 1411|             3.3|
|       admin.| 5000|            11.6|
|   technician| 7355|            17.0|
|     services| 4004|             9.3|
|    housemaid| 1195|             2.8|
|   unemployed| 1274|             2.9|
+-------------+-----+----------------+

+---------+-----+----------------+
|education|count|percent_of_total|
+---------+-----+----------------+
| tertiary|13262|            30.7|
|secondary|23131|            53.6|
|  primary| 6800|            15.7|
+---------+-----+----------------+



#### Binary Variables (**default**, **housing**, **loan**)

In [17]:
binary = ['default', 'housing', 'loan']

for col_name in binary:
  df_clean.groupBy(col_name).count() \
  .withColumn('percent_of_total', F.round((F.col('count') / no_rows)*100, 1)) \
  .show()

+-------+-----+----------------+
|default|count|percent_of_total|
+-------+-----+----------------+
|     no|42411|            98.2|
|    yes|  782|             1.8|
+-------+-----+----------------+

+-------+-----+----------------+
|housing|count|percent_of_total|
+-------+-----+----------------+
|     no|18901|            43.8|
|    yes|24292|            56.2|
+-------+-----+----------------+

+----+-----+----------------+
|loan|count|percent_of_total|
+----+-----+----------------+
|  no|36086|            83.5|
| yes| 7107|            16.5|
+----+-----+----------------+



As we can see, the dataset is highly unbalanced with respect to the variables **default** and **loan**, so that it is maybe a good idea not to include these variables for the machine learning modelling.

Let's code 'no' as 0 and 'yes' as 1 for the binary variables.

In [18]:
df_clean = df_clean.withColumn(
    'default_b',
    F.when((col('default') == 'yes'), 1).otherwise(0)
    ) \
    .withColumn(
    'housing_b',
    F.when((col('housing') == 'yes'), 1).otherwise(0)
    ) \
    .withColumn(
    'loan_b',
    F.when((col('loan') == 'yes'), 1).otherwise(0)
    )

# drop original columns
df_clean = df_clean.drop('default', 'housing', 'loan')

### Save clean dataset for EDA

In [20]:
df_clean.toPandas().to_csv('/content/clean_data.csv')

### Machine Learning modelling

#### Cyclical Encoding of month

As we could be dealing with more than one year, it is safer to perform cyclic encoding for month.

> Blockzitat einfügen



In [21]:
df_clean.columns

['age',
 'job',
 'marital',
 'education',
 'balance',
 'day',
 'month',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'y',
 'default_b',
 'housing_b',
 'loan_b']

In [22]:
# First convert month to a numerical variable
df_ml = df_clean.withColumn('month', from_unixtime(unix_timestamp(col('month'), 'MMM'), 'MM'))

In [23]:

df_ml = df_ml.withColumn('month', df_ml['month'].cast('int'))
df_ml.select('month').distinct().show()

+-----+
|month|
+-----+
|   12|
|    1|
|    6|
|    3|
|    5|
|    9|
|    4|
|    8|
|    7|
|   10|
|   11|
|    2|
+-----+



In [24]:
# Cyclical encoding of month
df_ml = df_ml.withColumn('month_sin', sin(2*pi*(df_ml['month'] - 1)/12))
df_ml = df_ml.withColumn('month_cos', sin(2*pi*(df_ml['month'] - 1)/12))

#### Cyclical Encoding of day

In [25]:
# create an auxiliar column having the number of days in a given month
df_ml = df_ml.withColumn(
    'month_days',
    F.when(F.col('month').isin([1, 3, 5, 7, 8, 10, 12]), 31) #months with 31 days
    .when(F.col('month').isin([4, 6, 9, 11]), 30) # months with 30 days
    .otherwise(28) # february
)

In [26]:
# Cyclical encoding of day
df_ml = df_ml.withColumn('day_sin', sin(2*pi*(df_ml['day'] - 1)/df_ml['month_days']))
df_ml = df_ml.withColumn('day_cos', sin(2*pi*(df_ml['day'] - 1)/df_ml['month_days']))
# drop auxiliar column
df_ml = df_ml.drop('month_days')

In [27]:
# We don't need the original month and day columns for machine learning
df_ml = df_ml.drop('month', 'day')

### Preparation of numerical variables

In [28]:
df_ml = df_ml.drop('duration')

### Machine Learning Pipeline

#### One-Hot Encoding for Categorical Variables




In [29]:
job_indexer = StringIndexer(inputCol = 'job', outputCol='job_index')
job_encoder = OneHotEncoder(inputCol='job_index', outputCol='job_vector')

#### VectorAssembler

As it is stated in the [description](https://archive.ics.uci.edu/dataset/222/bank+marketing) of the dataset, **duration** is the *last contact duration, in seconds (numeric). Important note: this attribute highly affects the output target (e.g., if duration=0 then y='no'). Yet, the duration is not known before a call is performed. Also, after the end of the call y is obviously known. Thus, this input should only be included for benchmark purposes and should be discarded if the intention is to have a realistic predictive model.*

Therefore, we will not use this variable for prediction purposes.

Columns to exclude:
- binary
 - default
 - loan
- numerical
 - duration

In [30]:
assembler = VectorAssembler(inputCols=['age', 'job_vector'], outputCol='features')

#### Logistic Regression

### Save machine learning ready dataframe to a CSV file

In [31]:
# save data
#df_ml.toPandas().to_csv('/content/ml_data.csv')