# Data Preprocessing Notebook

This notebook provides a walkthrough of the data preprocessing steps using PySpark, preparing the dataset for subsequent exploratory data analysis and modeling.

## Setting Up Spark Environment

In [20]:
import os

# Set Java and Spark paths
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17"
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/4.0.0/libexec"

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("PreprocessingNotebook") \
    .getOrCreate()

## Loading Data

In [21]:
#Reading in data from CSV file in data folder
customer_data = spark.read.option("sep", "\t").option("inferSchema", "true").csv("../Data/marketing_campaign.csv", header = True)

In [22]:
#Verify data was read in correctly
customer_data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: string (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- AcceptedCmp3: integer (nullable = true)
 |-- AcceptedCmp4: integer (nullable = true)
 |-- AcceptedC

In [23]:
#Inspect first 20 rows of data
customer_data.show()

+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+-------------+---------+--------+
|  ID|Year_Birth| Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|MntWines|MntFruits|MntMeatProducts|MntFishProducts|MntSweetProducts|MntGoldProds|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|AcceptedCmp3|AcceptedCmp4|AcceptedCmp5|AcceptedCmp1|AcceptedCmp2|Complain|Z_CostContact|Z_Revenue|Response|
+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+----------

In [24]:
#Gets number of rows in the dataset
row_count = customer_data.count()

print(row_count)

#2,240 rows in DF

2240


## Data Cleaning

After an initial review of the dataset, the formatting appeared consistent across all variables, with the exception of `DT_Customers`, which will be converted to a datetime format later in the process. The remaining preprocessing steps involved handling rows containing missing values and removing columns deemed irrelevant for the analysis.

In [25]:
from pyspark.sql import functions as F

#Showing the number of NA values in each column of the dataset
null_counts = customer_data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in customer_data.columns])

null_counts.show()

+---+----------+---------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+-------------+---------+--------+
| ID|Year_Birth|Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|MntWines|MntFruits|MntMeatProducts|MntFishProducts|MntSweetProducts|MntGoldProds|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|AcceptedCmp3|AcceptedCmp4|AcceptedCmp5|AcceptedCmp1|AcceptedCmp2|Complain|Z_CostContact|Z_Revenue|Response|
+---+----------+---------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+---

The only column with missing values is `Income`, which contains 24 NA entries. These rows will be removed, as over 2,000 observations will remain—providing sufficient data for subsequent analysis and modeling.

In [26]:
#Dropping rows with NA values in the Income column
customer_data = customer_data.na.drop(subset = ['Income'])

In [27]:
#Checking how many rows are in the dataset after dropping NA values
row_count = customer_data.count()

print(row_count)

2216


In [28]:
#Validating that there are no more NA values in the dataset
null_counts = customer_data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in customer_data.columns])

null_counts.show()

+---+----------+---------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+-------------+---------+--------+
| ID|Year_Birth|Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|MntWines|MntFruits|MntMeatProducts|MntFishProducts|MntSweetProducts|MntGoldProds|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|AcceptedCmp3|AcceptedCmp4|AcceptedCmp5|AcceptedCmp1|AcceptedCmp2|Complain|Z_CostContact|Z_Revenue|Response|
+---+----------+---------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+---

In [29]:
#Drops columns we won't be using for efficiency
customer_data_filtered = customer_data.drop('Z_CostContact', 'Z_Revenue', 'Response')

#Printing schema of filtered table to ensure necessary columns where dropped
customer_data_filtered.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: string (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- AcceptedCmp3: integer (nullable = true)
 |-- AcceptedCmp4: integer (nullable = true)
 |-- AcceptedC

## Feature Engineering

Based on my initial analysis, I identified opportunities to engineer new features that could provide additional insights into customer behavior. I also recognized that the `Education` and `Marital_Status` columns, being categorical variables without inherent ordinal relationships, would require one-hot encoding. This transformation will be implemented later during the clustering pipeline.

In [30]:
from pyspark.sql.functions import when

#Creating the feature engineering dataframe which is a copy of the filtered dataframe
customer_data_fe = customer_data_filtered

#Creates total_num_kids column which combines the two kids columns to get the total number of kids
customer_data_fe = customer_data_fe.withColumn('total_num_kids',
                                                     F.col('Kidhome') + F.col('Teenhome')
                                                     )
#Converting Dt_Customer into a date value
customer_data_fe = customer_data_fe.withColumn('Dt_Customer_datetime',
                                                     F.to_date(F.col('Dt_Customer'), 'dd-MM-yyyy')
                                                     )

#Gets the maximum date from the Dt_Customer_datetime column
dt_max_date = customer_data_fe.select(F.max('Dt_Customer_datetime')).collect()[0][0]

#Creates the total_days_filtered which subtracts each dates from the maxiumum (most recent) date
customer_data_fe = customer_data_fe.withColumn('total_days_enlisted',
                                               F.date_diff(F.lit(dt_max_date), F.col('Dt_Customer_datetime'))
                                               )

#Creates the total_campaigns_accepted column which is a sum of AcceptedCmp1-5
customer_data_fe = customer_data_fe.withColumn('total_campaigns_accepted',
                                               F.col('AcceptedCmp1') + F.col('AcceptedCmp2') + F.col('AcceptedCmp3') + F.col('AcceptedCmp4') + F.col('AcceptedCmp5')
                                               )
#Extracts the year from the maximum date
max_year = dt_max_date.year

#Creates an age column that subtracts the maximum year from the Year_Birth
customer_data_fe = customer_data_fe.withColumn('age',
                                               F.lit(max_year) - F.col('Year_Birth')
                                               )

# Remove invalid categories
customer_data_fe = customer_data_fe.filter(~F.col("Marital_Status").isin("YOLO", "Absurd"))

# Replace 'Alone' with 'Single'
customer_data_fe = customer_data_fe.withColumn("Marital_Status",
    when(F.col("Marital_Status") == "Alone", "Single").otherwise(F.col("Marital_Status"))
)

customer_data_fe = customer_data_fe.withColumn(
    "Education",
    when(F.col("Education") == "Basic", "High School")
    .when(F.col("Education") == "Graduation", "Bachelors")
    .otherwise(F.col("Education"))
)

# Create total spent column for further analysis
cols = ["MntWines", "MntFruits", "MntMeatProducts",
        "MntFishProducts", "MntSweetProducts", "MntGoldProds"]

customer_data_fe = customer_data_fe.withColumn("total_spent", sum([F.col(c) for c in cols]))

# Create age groups
customer_data_fe = customer_data_fe.withColumn(
    "age_group",
    F.when(F.col("age") < 30, "under 30")
     .when(F.col("age") < 40, "30s")
     .when(F.col("age") < 50, "40s")
     .when(F.col("age") < 60, "50s")
     .otherwise("60+")
)

In [31]:
#Verify new columns added
customer_data_fe.show()

+----+----------+-----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+--------------+--------------------+-------------------+------------------------+---+-----------+---------+
|  ID|Year_Birth|  Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|MntWines|MntFruits|MntMeatProducts|MntFishProducts|MntSweetProducts|MntGoldProds|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|AcceptedCmp3|AcceptedCmp4|AcceptedCmp5|AcceptedCmp1|AcceptedCmp2|Complain|total_num_kids|Dt_Customer_datetime|total_days_enlisted|total_campaigns_accepted|age|total_spent|age_group|
+----+----------+-----------+--------------+------+-------+--------+-----------+-------+--------+---------+-----------

In [32]:
#Exports prepreocessed dataframe to data folder
customer_data_fe.write.csv("../Data/preprocessed_segmenting_data_csv", header = True, mode = "overwrite")