# Data Prep - Customer Entity
The `customer` entity represents the features grouped at the single customer level and thus are the most important for performing customer segmentation and clustering tasks. 

In [1]:
!pip install inflection >> ../configs/package_installation.txt

In [6]:
%load_ext nb_black
%load_ext autoreload
%autoreload 2

The nb_black extension is already loaded. To reload it, use:
  %reload_ext nb_black
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


<IPython.core.display.Javascript object>

In [7]:
# PySpark dependencies:
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.window import Window

# database utilities:
from sqlalchemy import create_engine
import sqlite3 as db
import pandas as pd

# other relevant libraries:
import warnings
import inflection
import unicodedata
from datetime import datetime, timedelta
import json
import re
import os
from glob import glob
import shutil
import itertools

# setting global parameters for visualizations:
warnings.filterwarnings("ignore")
pd.set_option("display.precision", 4)
pd.set_option("display.float_format", lambda x: "%.2f" % x)

<IPython.core.display.Javascript object>

# 0. Building Spark Session

In [8]:
# loading the configurations needed for Spark
def init_spark(app_name):

    spark = (
        SparkSession.builder.appName(app_name)
        .config("spark.files.overwrite", "true")
        .config("spark.sql.repl.eagerEval.enabled", True)
        .config("spark.sql.repl.eagerEval.maxNumRows", 5)
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
        .config("spark.sql.parquet.compression.codec", "gzip")
        .enableHiveSupport()
        .getOrCreate()
    )

    return spark


# init the spark session:
spark = init_spark("Customer Preparation")

<IPython.core.display.Javascript object>

In [9]:
# verifying the spark session:
spark

<IPython.core.display.Javascript object>

# 1. Utility Functions

In [10]:
def get_pivot_product_info(df, feature_col):
    """Helper function to reproduce the pivotting and aggregation procedure for generic columns"""
    df_pivot = (
        df.groupby("customer_id")
        .pivot("type_of_product")
        .agg(F.max(F.col(feature_col)))
    )

    new_cols = [f"{col}_{feature_col}" for col in df_pivot.columns[1:]]

    for idx, col in enumerate(new_cols, start=1):
        df_pivot = df_pivot.withColumnRenamed(df_pivot.columns[idx], col)

    return df_pivot


def save_to_filesystem(df, target_path, parquet_path, filename):
    """Helper function to save pyspark dataframes as parquets in a way that is similar to writing to local files

    Args:
        df (pyspark.sql.dataframe.DataFrame): dataframe to be saved
        target_path (str): path that will store the file
        filename (str): name of the resulting file

    Returns:
        None
    """
    PARQUET_FILE = f"{target_path}/{parquet_path}"
    OUTPUT_FILE = f"{target_path}/{filename}"

    if os.path.exists(PARQUET_FILE):
        shutil.rmtree(
            PARQUET_FILE
        )  # if the directory already exists, remove it (throws error if not)

    # saves the dataframe:
    df.coalesce(1).write.save(PARQUET_FILE)

    # retrieves file resulting from the saving procedure:
    original_file = glob(f"{PARQUET_FILE}/*.parquet")[0]

    # renames the resulting file and saves it to the target directory:
    os.rename(original_file, OUTPUT_FILE)

    shutil.rmtree(PARQUET_FILE)

    return True


def apply_category_map(category_map):
    """Helper function to convert strings given a map

    Note:
        This function uses the function generator scheme, much like the PySpark code

    Args:
        original_category (str): the original category name
        category_map (dict): the hash table or dictionary for converting the values:

    Returns:
        new_category (str): the resulting category

    """

    def func(row):
        try:
            result = category_map[row]
        except:
            result = None
        return result

    return F.udf(func)


def get_datetime_features(df, time_col):
    """Function to extract time-based features from pyspark dataframes

    Args:
        df (pyspark.sql.dataframe.DataFrame): the original dataframe that needs to be enriched
        time_col (str): the string name of the column containing the date object

    Returns:
        df (pyspark.sql.dataframe.DataFrame): resulting pyspark dataframe with the added features
            -> See list of attribute the source code for the attributes

    """

    # applying date-related functions:

    # day-level attributes:
    df = df.withColumn("day_of_week", F.dayofweek(F.col(time_col)))

    df = df.withColumn("day_of_month", F.dayofmonth(F.col(time_col)))

    df = df.withColumn("day_of_year", F.dayofyear(F.col(time_col)))

    # week-level attributes:
    df = df.withColumn("week_of_year", F.weekofyear(F.col(time_col)))

    # month-level attributes:
    df = df.withColumn("month", F.month(F.col(time_col)))

    df = df.withColumn("quarter", F.quarter(F.col(time_col)))

    # year-level attributes:
    df = df.withColumn("year", F.year(F.col(time_col)))

    return df


def bulk_aggregate(df, group_col, aggs, target_cols):
    """Wrapper function to apply multiple aggregations when performing group bys

    It utilizes the spark's SQL Context and string interpolation to perform the aggregation using SQL syntax.

    Args:
        df (pyspark.sql.dataframe.DataFrame): dataframe with raw data
        group_col (str): the column that will be used for grouping
        aggs (list): list of aggregations that want to be made (must be the same name as pyspark.sql.functions)
        target_cols (str): columns in which aggregations will be performed

    Returns:
        df_grouped (pyspark.sql.dataframe.DataFrame): dataframe with the grouped data
    """

    # buils the cartersian product of the lists
    aggs_to_perform = itertools.product(aggs, target_cols)

    Q_LAYOUT = """
    SELECT
        {},
        {}
        FROM df
        GROUP BY {}
    """

    aggregations = []
    for agg, col in aggs_to_perform:

        # builds the string for aggregation
        statement = f"{agg.upper()}({col}) as {agg}_{col}"
        aggregations.append(statement)

    full_statement = ",\n".join(aggregations)

    # uses string interpolation to build the full query statement
    QUERY = Q_LAYOUT.format(group_col, full_statement, group_col)

    # registers the dataframe as temporary table:
    df.registerTempTable("df")
    df_grouped = spark.sql(QUERY)

    # rounds values:
    for column in df_grouped.columns:
        df_grouped = df_grouped.withColumn(column, F.round(F.col(column), 1))

    return df_grouped


######### Text Processing Functions ########
@udf("string")
def normalize_text(text):
    """Helper function to normalize text data to ASCII and lower case, removing spaces

    Args:
        text (string): the string that needs to be normalized

    Returns:
        text (string): cleaned up string

    """
    regex = r"[^a-zA-Z0-9]+"

    if text is not None:

        text = str(text)
        text = text.lower()
        text = re.sub(regex, " ", text)
        text = text.strip()
        text = str(
            unicodedata.normalize("NFKD", text).encode("ASCII", "ignore"), "utf-8"
        )

    return text


def get_null_columns(df, normalize=False):
    """Helper function to print the number of null records for each column of a PySpark DataFrame.

    Args:
        df (pyspark.sql.dataframe.DataFrame): a PySpark Dataframe object

    Returns:
        None -> prints to standard out

    """

    if normalize:
        total = df.count()

        df_nulls = df.select(
            [
                (F.sum(F.when(F.col(column).isNull(), 1).otherwise(0)) / total).alias(
                    column
                )
                for column in df.columns
            ]
        )

    else:
        df_nulls = df.select(
            [
                F.sum(F.when(F.col(column).isNull(), 1).otherwise(0)).alias(column)
                for column in df.columns
            ]
        )

    # displaying the results to standard out
    df_nulls.show(1, truncate=False, vertical=True)


@udf("boolean")
def is_set_or_pack(text):

    # description entries to match:
    set_descriptions = {"set", "set of", "pack", "pack of", "box", "box of"}

    if text is not None:
        text = str(text)

        if text in set_descriptions:
            return True

        else:
            return False

    else:
        return False


@udf("integer")
def get_unit_size(text):

    if text is not None:
        check_if_digit = len(re.findall(r"(\d+)", text)) > 0

        if check_if_digit:
            set_size = int(re.findall(r"(\d+)", text)[0])
            return set_size

        else:
            return 1

    else:
        return 1


@udf("boolean")
def has_non_digits_only(text):
    """Function to match entries in the dataset that are purely non-digit characters

    Args:
        text (str): string containing the invoice code

    Returns:
        boolean: whether the text contains non-digit characters and is not related to cancellations

    """

    if text is not None:
        condition = all(character.isalpha() for character in text)

        if condition:
            return True

        else:
            return False

    else:
        return False

<IPython.core.display.Javascript object>

# 2. Implementing Customer Features
The customer entity for this project is the one that will be used for the customer segmentation task itself. The following map illustrates the reasoning for the features we will generate.

<img src="../reports/figures/Customer Entity.png" alt = "Customer Entity Mapp" style = "width:1182px; height=702px;">

Given that we don't have many of these features and informations regarding the customers, we will generate what is possible from the datasets available.

In [11]:
# loading the invoice dataset from the parquet file:
PROCESSED_DATA_DIR = "../data/processed/"

df_invoice = spark.read.parquet(PROCESSED_DATA_DIR + "tb_invoice.parquet")

df_product = spark.read.parquet(PROCESSED_DATA_DIR + "tb_product.parquet")

df_ecommerce = spark.read.parquet(PROCESSED_DATA_DIR + "tb_ecommerce.parquet")

<IPython.core.display.Javascript object>

In [12]:
# instantiating the SQL Context:
sql_context = SQLContext(spark.sparkContext)

<IPython.core.display.Javascript object>

## 2.1 RFM and Key metrics 
The Recency, Frequency and Monetary value variables are often used for baseline customer segmentation models using the RFM strategy. I will start by generating these elements as features. These are defined as below:

1. **Recency**: the time in days since the last purchase of the client;
2. **Frequency**: the number of times the customer made a purchase;
3. **Monetary Value**: the total money spent by the customer;

In [13]:
# for recency of all purchases, we will consider the day following the last date of the dataset as the upper end:
last_date = df_invoice.select(
    F.date_add(F.max(F.col("date")), 1).alias("last_date")
).collect()[0]["last_date"]

# generating the recency from the last date as a support variable:
df_invoice = df_invoice.withColumn("last_date", F.lit(last_date))

<IPython.core.display.Javascript object>

In [14]:
# adding the recency variable:
df_invoice = df_invoice.withColumn(
    "recency", F.datediff(F.col("last_date"), F.col("date"))
)

<IPython.core.display.Javascript object>

In [15]:
# get the time period between the max and min dates:
time_period = df_invoice.select(
    F.datediff(F.max(F.col("date")), F.min(F.col("date"))).alias("total_period")
).collect()[0]["total_period"]

<IPython.core.display.Javascript object>

In [16]:
# generating the RFM variables:
df_customer_rfm = df_invoice.groupby("customer_id").agg(
    F.min(F.col("recency")).alias("recency"),
    F.countDistinct(F.col("invoice_no")).alias("n_orders"),
    F.round(F.sum(F.abs(F.col("total_paid"))), 2).alias("gross_revenue"),
    F.round(F.sum(F.abs(F.col("total_paid_cancelled"))), 2).alias("total_cancelled"),
)

# adding frequency (per month):
df_customer_rfm = df_customer_rfm.withColumn(
    "frequency",
    F.round(F.col("n_orders") / F.lit(30), 3),
)

# adding monetary value:
df_customer_rfm = df_customer_rfm.withColumn(
    "monetary_value", F.col("gross_revenue") - F.col("total_cancelled")
)

# adding average ticket:
df_customer_rfm = df_customer_rfm.withColumn(
    "average_ticket", F.round(F.col("gross_revenue") / F.col("n_orders"), 2)
)

<IPython.core.display.Javascript object>

In [17]:
# visualizing the results:
df_customer_rfm

customer_id,recency,n_orders,gross_revenue,total_cancelled,frequency,monetary_value,average_ticket
15619,11,1,336.4,0.0,0.033,336.4,336.4
17389,1,43,32367.28,533.6,1.433,31833.68,752.73
18944,145,1,526.8,0.0,0.033,526.8,526.8
12940,47,4,950.79,37.25,0.133,913.54,237.7
14450,181,3,483.25,0.0,0.1,483.25,161.08


<IPython.core.display.Javascript object>

## 2.2 Basket Features
Along with the basic RFM features, I will introduce some other relevant characteristics of the customer's basket.

In [18]:
basket_features = [
    "basket_size",
    "basket_diversity",
    "cancelled_items",
    "free_items",
    "returned_items",
    "sale_items",
    "total_discounts",
    "total_paid_fee",
    "total_paid_manual",
    "total_paid_postage",
    "total_paid_returned",
    "total_paid_sale",
    "total_quantity_cancelled",
    "total_quantity_free",
    "total_quantity_returned",
    "total_quantity_sale",
]

df_customer_basket = df_invoice.groupby("customer_id").agg(
    F.round(F.avg(F.col("basket_size")), 2).alias("average_basket_size"),
    F.round(F.avg(F.col("basket_diversity")), 2).alias("average_basket_diversity"),
    F.sum(F.col("basket_size")).cast("int").alias("total_items"),
    F.sum(F.col("cancelled_items")).alias("total_cancelled_items"),
    F.sum(F.col("free_items")).alias("total_free_items"),
    F.sum(F.col("returned_items")).alias("total_returned_items"),
    F.sum(F.col("sale_items")).alias("total_sale_items"),
    F.sum(F.col("total_discounts")).alias("total_discounts_received"),
    F.sum(F.col("total_paid_fee")).alias("total_paid_fees"),
    F.sum(F.col("total_paid_manual")).alias("total_paid_manual"),
    F.sum(F.col("total_paid_postage")).alias("total_paid_postage"),
    F.sum(F.col("total_paid_returned")).alias("total_paid_returned"),
    F.round(F.sum(F.col("total_paid_sale")), 2).alias("total_paid_sale"),
    F.sum(F.col("total_quantity_cancelled")).alias("total_units_cancelled"),
    F.sum(F.col("total_quantity_free")).alias("total_units_free"),
    F.sum(F.col("total_quantity_returned")).alias("total_units_returned"),
    F.sum(F.col("total_quantity_sale")).alias("total_units_sale"),
)

<IPython.core.display.Javascript object>

In [19]:
# verifying the results:
df_customer_basket

customer_id,average_basket_size,average_basket_diversity,total_items,total_cancelled_items,total_free_items,total_returned_items,total_sale_items,total_discounts_received,total_paid_fees,total_paid_manual,total_paid_postage,total_paid_returned,total_paid_sale,total_units_cancelled,total_units_free,total_units_returned,total_units_sale
14450,80.33,13.33,241,0,10,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,104.0,0.0,0.0
17389,181.26,5.21,7794,11,10,0,145,0.0,0.0,17.4,0.0,0.0,26569.01,182.0,222.0,0.0,6301.0
13285,512.75,46.75,2051,0,62,0,5,0.0,0.0,0.0,0.0,0.0,95.7,0.0,958.0,0.0,46.0
16503,114.2,17.2,571,2,5,0,1,0.0,0.0,0.0,0.0,0.0,15.6,2.0,64.0,0.0,8.0
18654,93.0,57.0,93,0,2,0,14,0.0,0.0,0.0,175.13,0.0,361.28,0.0,2.0,0.0,14.0


<IPython.core.display.Javascript object>

In [20]:
# basket features:
basket_related_features = df_customer_basket.columns

<IPython.core.display.Javascript object>

## 2.3 Time between purchases

In [21]:
# selecting a temporary view of the invoice dataset
df_temp = df_invoice.select("customer_id", "invoice_no", "date")


# defining a window function and associated column
time_between_purchases = Window.partitionBy("customer_id").orderBy("date")

df_temp = df_temp.withColumn(
    "time_between_purchases",
    F.datediff(F.col("date"), F.lag(F.col("date"), 1).over(time_between_purchases)),
)

# adding the results back into the invoice dataset:
df_invoice = df_invoice.join(
    df_temp, on=["customer_id", "invoice_no", "date"], how="left"
)

<IPython.core.display.Javascript object>

In [22]:
# performing the time aggregations:
time_based_features = [
    'time_between_purchases',
    'days_to_next_bank_holiday',
    'days_to_next_commercial_holiday',
    'month',
    'week_of_year',
    'day_of_month',
    'day_of_week'
]

df_customer_time = (df_invoice
                       .groupby('customer_id')
                       .agg(
                           F.avg(F.col('time_between_purchases')).alias('average_time_between_purchases'),
                           F.avg(F.col('days_to_next_bank_holiday')).alias('average_time_to_next_bank_holiday'),
                           F.avg(F.col('days_to_next_commercial_holiday')).alias('average_time_to_next_commercial_holiday')
                       ))

# generating a temporary view of the raw dataframe:
sql_context.registerDataFrameAsTable(df_invoice, 
                                     "tb_invoice")


<IPython.core.display.Javascript object>

In [23]:
# defining window functions in sql context to generate the most relevant time periods for customers:
Q_MONTH_MOST_ACTIVE = """

with months as (
SELECT
    customer_id,
    month as time_unit,
    COUNT(DISTINCT invoice_no) as n_orders
FROM tb_invoice
GROUP BY customer_id, month
)

SELECT 
    customer_id,
    time_unit as month_most_active
FROM (
SELECT
    A.*,
    RANK() OVER(PARTITION BY customer_id ORDER BY n_orders DESC) as rank_idx
    FROM months as A
) AS temp
WHERE rank_idx = 1
"""

df_month = spark.sql(Q_MONTH_MOST_ACTIVE)

Q_WEEK_MOST_ACTIVE = """

with weeks as (
SELECT
    customer_id,
    week_of_year as time_unit,
    COUNT(DISTINCT invoice_no) as n_orders
FROM tb_invoice
GROUP BY customer_id, week_of_year
)

SELECT 
    customer_id,
    time_unit as week_most_active
FROM (
SELECT
    A.*,
    RANK() OVER(PARTITION BY customer_id ORDER BY n_orders DESC) as rank_idx
    FROM weeks as A
) AS temp
WHERE rank_idx = 1
"""

df_week = spark.sql(Q_WEEK_MOST_ACTIVE)

Q_DAY_OF_WEEK = """

with days as (
SELECT
    customer_id,
    day_of_week as time_unit,
    COUNT(DISTINCT invoice_no) as n_orders
FROM tb_invoice
GROUP BY customer_id, day_of_week
)

SELECT 
    customer_id,
    time_unit as day_of_week_most_active
FROM (
SELECT
    A.*,
    RANK() OVER(PARTITION BY customer_id ORDER BY n_orders DESC) as rank_idx
    FROM days as A
) AS temp
WHERE rank_idx = 1
"""

df_day_of_week = spark.sql(Q_DAY_OF_WEEK)


Q_DAY_OF_MONTH = """

with days as (
SELECT
    customer_id,
    day_of_month as time_unit,
    COUNT(DISTINCT invoice_no) as n_orders
FROM tb_invoice
GROUP BY customer_id, day_of_month
)

SELECT 
    customer_id,
    time_unit as day_of_month_most_active
FROM (
SELECT
    A.*,
    RANK() OVER(PARTITION BY customer_id ORDER BY n_orders DESC) as rank_idx
    FROM days as A
) AS temp
WHERE rank_idx = 1
"""

df_day_of_month = spark.sql(Q_DAY_OF_MONTH)

<IPython.core.display.Javascript object>

In [24]:
# joining all the time segmentations:
df_dates = (
    df_month.join(df_week, on=["customer_id"], how="left")
    .join(df_day_of_week, on=["customer_id"], how="left")
    .join(df_day_of_month, on=["customer_id"], how="left")
    .drop_duplicates(subset=["customer_id"])
)

<IPython.core.display.Javascript object>

In [25]:
# joining on the full time dataframe:
df_customer_time = df_customer_time.join(
    df_dates, on=["customer_id"], how="left"
).drop_duplicates(subset=["customer_id"])

<IPython.core.display.Javascript object>

In [26]:
# getting the resulting columns:
time_features = df_customer_time.columns

<IPython.core.display.Javascript object>

## 2.4 Profile Features

In [27]:
# extracting some key characteristics from customer profile:
df_customer_profile = df_invoice.groupby("customer_id").agg(
    F.first(F.col("customer_country")).alias("customer_country"),
    F.min(F.col("date")).alias("first_purchase_date"),
)

# check if the customer is from the UK:
df_customer_profile = df_customer_profile.withColumn(
    "is_foreign",
    F.when(F.col("customer_country") != "united kingdom", True).otherwise(False),
)

# getting account age
df_customer_profile = df_customer_profile.withColumn(
    "account_age_days", F.datediff(F.lit(last_date), F.col("first_purchase_date"))
)

<IPython.core.display.Javascript object>

In [28]:
# extracting the profile columns:
profile_cols = df_customer_profile.columns

<IPython.core.display.Javascript object>

## 2.5 Categorizing Resellers

In [29]:
# as demonstrated during the EDA notebook for the customer entity, we will implement the reseller feature here as well:
reseller_threshold = df_customer_rfm.approxQuantile("monetary_value", [0.95], 0.0001)[0]

df_customer_rfm = df_customer_rfm.withColumn(
    "is_considered_reseller",
    F.when(F.col("monetary_value") >= reseller_threshold, True).otherwise(False),
)

<IPython.core.display.Javascript object>

In [30]:
# extracting the columns:
rfm_cols = df_customer_rfm.columns

<IPython.core.display.Javascript object>

# 3. Preparing the Output

In [31]:
# final schema would be:
all_cols = profile_cols + rfm_cols + time_features + basket_related_features

all_cols = list(filter(lambda x: x != "customer_id", all_cols))

col_order = ["customer_id"] + all_cols

<IPython.core.display.Javascript object>

In [32]:
# joining all the datasets:
df_customer = df_customer_profile.join(
    df_customer_basket, how="left", on=["customer_id"]
)

df_customer = df_customer.join(df_customer_rfm, how="left", on=["customer_id"])

df_customer = df_customer.join(df_customer_time, how="left", on=["customer_id"])

# dropping the duplicates:
df_customer = df_customer.drop_duplicates(subset=["customer_id"])

<IPython.core.display.Javascript object>

In [33]:
# rounding down time-based columns:
cols_to_floor = [
    "average_time_to_next_commercial_holiday",
    "average_time_to_next_bank_holiday",
    "average_time_between_purchases",
]

for col in cols_to_floor:

    df_customer = df_customer.withColumn(col, F.floor(F.col(col)))

<IPython.core.display.Javascript object>

In [34]:
# selecting the column order:
df_customer = df_customer.select(*col_order)

<IPython.core.display.Javascript object>

In [35]:
# verifying the schema:
df_customer.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_country: string (nullable = true)
 |-- first_purchase_date: date (nullable = true)
 |-- is_foreign: boolean (nullable = false)
 |-- account_age_days: integer (nullable = true)
 |-- recency: integer (nullable = true)
 |-- n_orders: long (nullable = true)
 |-- gross_revenue: double (nullable = true)
 |-- total_cancelled: double (nullable = true)
 |-- frequency: double (nullable = true)
 |-- monetary_value: double (nullable = true)
 |-- average_ticket: double (nullable = true)
 |-- is_considered_reseller: boolean (nullable = true)
 |-- average_time_between_purchases: long (nullable = true)
 |-- average_time_to_next_bank_holiday: long (nullable = true)
 |-- average_time_to_next_commercial_holiday: long (nullable = true)
 |-- month_most_active: integer (nullable = true)
 |-- week_most_active: integer (nullable = true)
 |-- day_of_week_most_active: integer (nullable = true)
 |-- day_of_month_most_active: integer (nullable = true)

<IPython.core.display.Javascript object>

# 4. Saving the Dataset

In [None]:
# saving the enhanced raw data as parquet in the processed step of the pipeline
PROCESSED_DATA_DIR = '../data/processed'


# using the helper function to save the file:
save_to_filesystem(df_customer, 
                   PROCESSED_DATA_DIR,
                   'tb_customer',
                   'tb_customer.parquet')