In [1]:
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, array_contains, explode
import pyspark.pandas as ps

from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline



# Load Data

In [2]:
spark = SparkSession.builder \
    .appName("Read JSON") \
    .getOrCreate()

df_data = spark.read.json("../data/processed/processed_data.json").pandas_api()
df_data = df_data.sort_values(by=['account_id', 'offer_id'])
df_data = df_data.to_spark()

channels = (
    df_data.select(explode("channels").alias("ch"))
      .distinct()
      .rdd.map(lambda r: r.ch)
      .collect()
)

# multi-hot encoding
for ch in channels:
    df_data = df_data.withColumn(
        f"channel_{ch}",
        array_contains(col("channels"), ch).cast("int")
    )

df_data = df_data.pandas_api()
df_data = df_data.drop(columns=['channels'])
df_data.head(20)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/14 00:51:24 WARN Utils: Your hostname, pedro-pc, resolves to a loopback address: 127.0.1.1; using 192.168.15.69 instead (on interface wlo1)
26/01/14 00:51:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/14 00:51:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/14 00:51:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
26/01/14 00:51:34 WARN AttachDistributedSequenceExec: clean up cached RDD(29) in AttachDistributedSequenceExec(216)


Unnamed: 0,account_id,age,credit_card_limit,discount_value,duration,event,gender,min_value,offer_id,offer_type,registered_on,reward,time_since_test_start,channel_mobile,channel_email,channel_social,channel_web
0,0009655768c64bdeb2e877511632db8f,33,72000.0,2,7.0,offer completed,M,10,2906b810c7d4411798c6938adc9daaa5,discount,2017-04-21,2.0,24.0,1,1,0,1
1,0009655768c64bdeb2e877511632db8f,33,72000.0,2,7.0,offer received,M,10,2906b810c7d4411798c6938adc9daaa5,discount,2017-04-21,,24.0,1,1,0,1
2,0009655768c64bdeb2e877511632db8f,33,72000.0,0,4.0,offer viewed,M,0,3f207df678b143eea3cee63160fa8bed,informational,2017-04-21,,15.5,1,1,0,1
3,0009655768c64bdeb2e877511632db8f,33,72000.0,0,4.0,offer received,M,0,3f207df678b143eea3cee63160fa8bed,informational,2017-04-21,,14.0,1,1,0,1
4,0009655768c64bdeb2e877511632db8f,33,72000.0,0,3.0,offer received,M,0,5a8bc65990b245e5a138643cd4eb9837,informational,2017-04-21,,7.0,1,1,1,0
5,0009655768c64bdeb2e877511632db8f,33,72000.0,0,3.0,offer viewed,M,0,5a8bc65990b245e5a138643cd4eb9837,informational,2017-04-21,,8.0,1,1,1,0
6,0009655768c64bdeb2e877511632db8f,33,72000.0,5,5.0,offer completed,M,5,f19421c1d4aa40978ebb69ca19b0e20d,bogo,2017-04-21,5.0,17.25,1,1,1,1
7,0009655768c64bdeb2e877511632db8f,33,72000.0,5,5.0,offer received,M,5,f19421c1d4aa40978ebb69ca19b0e20d,bogo,2017-04-21,,17.0,1,1,1,1
8,0009655768c64bdeb2e877511632db8f,33,72000.0,5,5.0,offer viewed,M,5,f19421c1d4aa40978ebb69ca19b0e20d,bogo,2017-04-21,,19.0,1,1,1,1
9,0009655768c64bdeb2e877511632db8f,33,72000.0,2,10.0,offer viewed,M,10,fafdcd668e3743c1bb461111dcafc2a4,discount,2017-04-21,,22.5,1,1,1,1


In [3]:
df_data['event'].unique()

0     offer received
1    offer completed
2       offer viewed
Name: event, dtype: object

# Approach 1: Classification of current transaction

In this approach, we will train a classifier to predict the offer type for a given consumer.

Assumptions:
* We will dicard the events "offer viewed" since they do not give much information regarding which offer converted into a sale
* Target variable will be the offer_type


## Preprocessing

In [4]:
df_data = df_data[df_data['event'].isin(['offer completed'])]
df_data = df_data.drop(columns=['account_id', 'offer_id', 'event', 'registered_on'])

cat_columns = ['gender', 'discount_value', 'min_value', 'duration', 'reward']
num_columns = ['age', 'credit_card_limit', 'duration', 'time_since_test_start', 'channel_mobile', 'channel_email', 'channel_social', 'channel_web']

In [5]:
target = df_data['offer_type'].to_pandas()
data = df_data.drop(columns=['offer_type']).to_pandas()

preprocessor = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_columns),
        ("num", "passthrough", num_columns)
    ]
)

26/01/14 00:51:37 WARN AttachDistributedSequenceExec: clean up cached RDD(53) in AttachDistributedSequenceExec(371)
26/01/14 00:51:39 WARN AttachDistributedSequenceExec: clean up cached RDD(67) in AttachDistributedSequenceExec(456)


## Defining Train and Test Partitions

In [6]:
size_data = len(data)
train_inds = np.random.randint(0, high=size_data, size=int(size_data*0.8))
test_inds = list(set(range(size_data)) - set(train_inds))

## Defining Classification Pipeline

In [7]:
pipeline = Pipeline(steps=[
    ("prep", preprocessor),
    ("rf", RandomForestClassifier(
        n_estimators=200,
        random_state=42,
        n_jobs=-1
    ))
])

In [8]:
pipeline.fit(data.iloc[train_inds], target.iloc[train_inds])

In [9]:
predict = pipeline.predict(data.iloc[test_inds])

In [10]:
print(classification_report(target.iloc[test_inds], predict))

              precision    recall  f1-score   support

        bogo       1.00      1.00      1.00      6864
    discount       1.00      1.00      1.00      7742

    accuracy                           1.00     14606
   macro avg       1.00      1.00      1.00     14606
weighted avg       1.00      1.00      1.00     14606

