# Case description

A supermarket is offering a new line of organic products. The supermarket's management wants to determine which customers are likely to purchase these products.
The supermarket has a customer loyalty program. As an initial buyer incentive plan, the supermarket provided coupons for the organic products to all of the loyalty program participants and collected data that includes whether these customers purchased any of the organic products.
The ORGANICS data set contains 13 variables and 22222 observations. 

# Step 1: Load Data

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv(r"Filepath", inferSchema=True, header=True) # read csv data, set inferSchema to be true.
data.show()

+-----+-------+------+----------+---------------+---------+----------+------------+---------+---------+--------+---------+---------+
|   ID|DemAffl|DemAge|DemCluster|DemClusterGroup|DemGender|    DemReg|    DemTVReg|PromClass|PromSpend|PromTime|TargetBuy|TargetAmt|
+-----+-------+------+----------+---------------+---------+----------+------------+---------+---------+--------+---------+---------+
|  140|     10|    76|        16|              C|        U|  Midlands|Wales & West|     Gold|  16000.0|       4|        0|        0|
|  620|      4|    49|        35|              D|        U|  Midlands|Wales & West|     Gold|   6000.0|       5|        0|        0|
|  868|      5|    70|        27|              D|        F|  Midlands|Wales & West|   Silver|     0.02|       8|        1|        1|
| 1120|     10|    65|        51|              F|        M|  Midlands|    Midlands|      Tin|     0.01|       7|        1|        1|
| 2313|     11|    68|         4|              A|        F|  Midlands

#### The ID column is not needed for machine learning. The variable DemCluster is a nominal variable that includes too many categories, so, I removed this variable and also removed the dependent variable "TargetAmt" which is not used. I also changed the datatype of the interval independent variables including 'DemAffl', 'DemAge', and 'PromTime' from interger into double for convenient data processing 

In [None]:
# step 1. dropping two columns ID, DemCluster, and TargetAmt
data = data.drop("ID", "DemCluster", "TargetAmt")
data.printSchema()
data.head(3) # this is an action. Head() returns a list of rows.
# step 2. changing the data types of  'DemAffl', 'DemAge', and 'PromTime' to double.
from pyspark.sql.types import DoubleType
for variable in ['DemAffl', 'DemAge','PromTime']:
    data = data.withColumn(variable,data[variable].cast("double"))
# printing schema again to verify if the data type conversion works. 
data.printSchema()

root
 |-- DemAffl: integer (nullable = true)
 |-- DemAge: integer (nullable = true)
 |-- DemClusterGroup: string (nullable = true)
 |-- DemGender: string (nullable = true)
 |-- DemReg: string (nullable = true)
 |-- DemTVReg: string (nullable = true)
 |-- PromClass: string (nullable = true)
 |-- PromSpend: double (nullable = true)
 |-- PromTime: integer (nullable = true)
 |-- TargetBuy: integer (nullable = true)

root
 |-- DemAffl: double (nullable = true)
 |-- DemAge: double (nullable = true)
 |-- DemClusterGroup: string (nullable = true)
 |-- DemGender: string (nullable = true)
 |-- DemReg: string (nullable = true)
 |-- DemTVReg: string (nullable = true)
 |-- PromClass: string (nullable = true)
 |-- PromSpend: double (nullable = true)
 |-- PromTime: double (nullable = true)
 |-- TargetBuy: integer (nullable = true)



# Step 2. Data Exploration

In [None]:
interval_variables = ['DemAffl','DemAge', 'PromSpend','PromTime']
data.select('DemAffl','DemAge', 'PromSpend','PromTime').describe().show()

+-------+-----------------+------------------+-----------------+-----------------+
|summary|          DemAffl|            DemAge|        PromSpend|         PromTime|
+-------+-----------------+------------------+-----------------+-----------------+
|  count|            21137|             20713|            22222|            21941|
|   mean|8.711832331929791|53.796890841500506|4420.248964089997|6.564605077252632|
| stddev|3.421194092206597|13.205780845120188|7559.046597013129|4.657208722596065|
|    min|              0.0|              18.0|             0.01|              0.0|
|    max|             34.0|              79.0|        296313.85|             39.0|
+-------+-----------------+------------------+-----------------+-----------------+



In [None]:
interval_with_missing = ['DemAffl','DemAge', 'PromSpend','PromTime']

In [None]:
nominal_with_dependent = ['DemClusterGroup', 'DemGender', 'DemReg', 'DemTVReg', 'PromClass', 'TargetBuy']
for column in nominal_with_dependent:
    data.groupBy(column).count().show()

+---------------+-----+
|DemClusterGroup|count|
+---------------+-----+
|              F| 3949|
|           null|  674|
|              E| 2607|
|              B| 4144|
|              U|   54|
|              D| 4378|
|              C| 4566|
|              A| 1850|
+---------------+-----+

+---------+-----+
|DemGender|count|
+---------+-----+
|        F|12148|
|     null| 2513|
|        M| 5815|
|        U| 1746|
+---------+-----+

+----------+-----+
|    DemReg|count|
+----------+-----+
|  Scottish| 1368|
|      null|  466|
|South East| 8634|
|South West|  691|
|  Midlands| 6740|
|     North| 4323|
+----------+-----+

+------------+-----+
|    DemTVReg|count|
+------------+-----+
|      Ulster|  266|
|      N East|  785|
|      Border|  203|
|      S West|  691|
|        null|  465|
|      London| 6189|
|   Yorkshire| 1443|
|        East| 1649|
|      N Scot|  329|
|      N West| 2096|
|  C Scotland|  836|
|    Midlands| 3122|
|Wales & West| 1703|
|  S & S East| 2445|
+------------+----

In [None]:
nominal_with_missing = ['DemClusterGroup', 'DemGender', 'DemReg', 'DemTVReg', 'PromClass', 'TargetBuy']

# Step 3. Feature Transformation

## 3.1. Missing value imputation for interval variables

In [None]:
from pyspark.sql import functions as f
from pyspark.ml.feature import Imputer
for variable in interval_with_missing:
    # step 1. add a missing indicator: 
    indicator_name = variable+"Missing" # The variable name of the missing value indictor 
    data = data.withColumn(indicator_name, f.when(f.isnull(data[variable]),1).otherwise(0))
    # do value count to verify.
    data.groupBy(indicator_name).count().show()
    # step 2: replace missing values with the mean
    imputed_name = variable + "Imputed" # The variable name of the imputed variable
    imputer = Imputer(strategy = 'mean', inputCols = [variable], outputCols = [imputed_name])
    imputer_model = imputer.fit(data)
    data = imputer_model.transform(data)
    # run the describe to check whether the missing values has been imputed. 
    data.describe(imputed_name).show()

+--------------+-----+
|DemAfflMissing|count|
+--------------+-----+
|             1| 1085|
|             0|21137|
+--------------+-----+

+-------+------------------+
|summary|    DemAfflImputed|
+-------+------------------+
|  count|             22222|
|   mean| 8.711832331929804|
| stddev|3.3366243422702384|
|    min|               0.0|
|    max|              34.0|
+-------+------------------+

+-------------+-----+
|DemAgeMissing|count|
+-------------+-----+
|            1| 1509|
|            0|20713|
+-------------+-----+

+-------+-----------------+
|summary|    DemAgeImputed|
+-------+-----------------+
|  count|            22222|
|   mean|53.79689084149959|
| stddev|12.74950444653274|
|    min|             18.0|
|    max|             79.0|
+-------+-----------------+

+----------------+-----+
|PromSpendMissing|count|
+----------------+-----+
|               0|22222|
+----------------+-----+

+-------+-----------------+
|summary| PromSpendImputed|
+-------+-----------------+
|  

## 3.2. Missing value imputation for categorical variables

In [None]:
for variable in nominal_with_missing:
    # if variable is DemGender, replace the missing values in the column with 'U'
    if variable == 'DemGender':
        # the method fillna can be used to replace missing values with a new value. 
        data = data.fillna({variable:'U'})
    # for the other variable,  replace the missing values in the column with 'unknown'
    else:
        data = data.fillna({variable: "unknown"})
    # run value_count to verify:
    data.groupBy(variable).count().show()

+---------------+-----+
|DemClusterGroup|count|
+---------------+-----+
|              F| 3949|
|        unknown|  674|
|              E| 2607|
|              B| 4144|
|              U|   54|
|              D| 4378|
|              C| 4566|
|              A| 1850|
+---------------+-----+

+---------+-----+
|DemGender|count|
+---------+-----+
|        F|12148|
|        M| 5815|
|        U| 4259|
+---------+-----+

+----------+-----+
|    DemReg|count|
+----------+-----+
|  Scottish| 1368|
|   unknown|  466|
|South East| 8634|
|South West|  691|
|  Midlands| 6740|
|     North| 4323|
+----------+-----+

+------------+-----+
|    DemTVReg|count|
+------------+-----+
|      Ulster|  266|
|      N East|  785|
|      Border|  203|
|     unknown|  465|
|      S West|  691|
|      London| 6189|
|   Yorkshire| 1443|
|        East| 1649|
|      N Scot|  329|
|      N West| 2096|
|  C Scotland|  836|
|    Midlands| 3122|
|Wales & West| 1703|
|  S & S East| 2445|
+------------+-----+

+---------+---

## 3.3. Categorical variable encoding:

In [None]:
from pyspark.ml.feature import StringIndexer
nominal_with_string = ['DemClusterGroup', 'DemGender', 'DemReg', 'DemTVReg', 'PromClass']
for variable in nominal_with_string:
    indexed_variable = variable+"Indexed" # The variable name of the indexed/encoded variable
    indexer = StringIndexer(inputCol = variable, outputCol = indexed_variable)
    indexer_model = indexer.fit(data)
    data = indexer_model.transform(data)
data.select('DemClusterGroupIndexed', 'DemGenderIndexed', 'DemRegIndexed', 'DemTVRegIndexed', 'PromClassIndexed').show()

+----------------------+----------------+-------------+---------------+----------------+
|DemClusterGroupIndexed|DemGenderIndexed|DemRegIndexed|DemTVRegIndexed|PromClassIndexed|
+----------------------+----------------+-------------+---------------+----------------+
|                   0.0|             2.0|          1.0|            4.0|             2.0|
|                   1.0|             2.0|          1.0|            4.0|             2.0|
|                   1.0|             0.0|          1.0|            4.0|             0.0|
|                   3.0|             1.0|          1.0|            1.0|             1.0|
|                   5.0|             0.0|          1.0|            1.0|             1.0|
|                   1.0|             2.0|          2.0|            3.0|             3.0|
|                   5.0|             0.0|          1.0|            5.0|             1.0|
|                   1.0|             1.0|          2.0|            8.0|             1.0|
|                   3

# 3.4. Dummy Coding:


In [None]:
from pyspark.ml.feature import OneHotEncoderEstimator
# a list of nominal variables that needs to be dummy coded.
indexed_nominal_variables = ['DemClusterGroupIndexed', 'DemGenderIndexed', 'DemRegIndexed', 'DemTVRegIndexed', 'PromClassIndexed']
for variable in indexed_nominal_variables:
    dummy_variable = variable+"Vec"
    encoder = OneHotEncoderEstimator(inputCols=[variable],
                                 outputCols=[dummy_variable])
    model = encoder.fit(data)
    data = model.transform(data)
data.select('DemClusterGroupIndexedVec', 'DemGenderIndexedVec', 'DemRegIndexedVec', 'DemTVRegIndexedVec', 'PromClassIndexedVec').show()

+-------------------------+-------------------+----------------+------------------+-------------------+
|DemClusterGroupIndexedVec|DemGenderIndexedVec|DemRegIndexedVec|DemTVRegIndexedVec|PromClassIndexedVec|
+-------------------------+-------------------+----------------+------------------+-------------------+
|            (7,[0],[1.0])|          (2,[],[])|   (5,[1],[1.0])|    (13,[4],[1.0])|      (3,[2],[1.0])|
|            (7,[1],[1.0])|          (2,[],[])|   (5,[1],[1.0])|    (13,[4],[1.0])|      (3,[2],[1.0])|
|            (7,[1],[1.0])|      (2,[0],[1.0])|   (5,[1],[1.0])|    (13,[4],[1.0])|      (3,[0],[1.0])|
|            (7,[3],[1.0])|      (2,[1],[1.0])|   (5,[1],[1.0])|    (13,[1],[1.0])|      (3,[1],[1.0])|
|            (7,[5],[1.0])|      (2,[0],[1.0])|   (5,[1],[1.0])|    (13,[1],[1.0])|      (3,[1],[1.0])|
|            (7,[1],[1.0])|          (2,[],[])|   (5,[2],[1.0])|    (13,[3],[1.0])|          (3,[],[])|
|            (7,[5],[1.0])|      (2,[0],[1.0])|   (5,[1],[1.0])|

In [None]:
# now let's take a look at the schema of data again
data.printSchema()
# print all the variable in the dataframe
print(data.columns)

root
 |-- DemAffl: double (nullable = true)
 |-- DemAge: double (nullable = true)
 |-- DemClusterGroup: string (nullable = false)
 |-- DemGender: string (nullable = false)
 |-- DemReg: string (nullable = false)
 |-- DemTVReg: string (nullable = false)
 |-- PromClass: string (nullable = false)
 |-- PromSpend: double (nullable = true)
 |-- PromTime: double (nullable = true)
 |-- TargetBuy: integer (nullable = true)
 |-- DemAfflMissing: integer (nullable = false)
 |-- DemAfflImputed: double (nullable = true)
 |-- DemAgeMissing: integer (nullable = false)
 |-- DemAgeImputed: double (nullable = true)
 |-- PromSpendMissing: integer (nullable = false)
 |-- PromSpendImputed: double (nullable = true)
 |-- PromTimeMissing: integer (nullable = false)
 |-- PromTimeImputed: double (nullable = true)
 |-- DemClusterGroupIndexed: double (nullable = false)
 |-- DemGenderIndexed: double (nullable = false)
 |-- DemRegIndexed: double (nullable = false)
 |-- DemTVRegIndexed: double (nullable = false)
 |-- 

# 3.5 Construct the Features Vector

#### Now let's create a list that includes all the independent variable we need to use in the machine learing.

In [None]:
# this independent variable list needs to include: 
# 1. For the nominal/categorical variable, only dummy variables created are included. 
# 2. For the interval variables
#    a. Interval variables that do not have missing values are included.
#    b. For the interval variables that have missing values,  the imputed variables and the missing value indicators are included.
independent_variables = ['DemClusterGroupIndexedVec', 'DemGenderIndexedVec', 'DemRegIndexedVec', 'DemTVRegIndexedVec', 'PromClassIndexedVec', "DemAfflMissing", "DemAfflImputed", "DemAgeMissing", "DemAgeImputed", "PromSpendMissing", "PromSpendImputed", "PromTimeMissing", "PromTimeImputed"]

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['DemClusterGroupIndexedVec', 'DemGenderIndexedVec', 'DemRegIndexedVec', 'DemTVRegIndexedVec', 'PromClassIndexedVec', 'DemAfflMissing', 'DemAfflImputed', "DemAgeMissing", 'DemAgeImputed', 'PromSpendMissing', 'PromSpendImputed', 'PromTimeMissing', 'PromTimeImputed'], outputCol='features')
data = assembler.transform(data)
data.head(2)

[Row(DemAffl=10.0, DemAge=76.0, DemClusterGroup='C', DemGender='U', DemReg='Midlands', DemTVReg='Wales & West', PromClass='Gold', PromSpend=16000.0, PromTime=4.0, TargetBuy=0, DemAfflMissing=0, DemAfflImputed=10.0, DemAgeMissing=0, DemAgeImputed=76.0, PromSpendMissing=0, PromSpendImputed=16000.0, PromTimeMissing=0, PromTimeImputed=4.0, DemClusterGroupIndexed=0.0, DemGenderIndexed=2.0, DemRegIndexed=1.0, DemTVRegIndexed=4.0, PromClassIndexed=2.0, DemClusterGroupIndexedVec=SparseVector(7, {0: 1.0}), DemGenderIndexedVec=SparseVector(2, {}), DemRegIndexedVec=SparseVector(5, {1: 1.0}), DemTVRegIndexedVec=SparseVector(13, {4: 1.0}), PromClassIndexedVec=SparseVector(3, {2: 1.0}), features=SparseVector(38, {0: 1.0, 10: 1.0, 18: 1.0, 29: 1.0, 31: 10.0, 33: 76.0, 35: 16000.0, 37: 4.0})),
 Row(DemAffl=4.0, DemAge=49.0, DemClusterGroup='D', DemGender='U', DemReg='Midlands', DemTVReg='Wales & West', PromClass='Gold', PromSpend=6000.0, PromTime=5.0, TargetBuy=0, DemAfflMissing=0, DemAfflImputed=4.0,

# Step 4. Training vs. test dataset partitioning

In [None]:
train, test = data.randomSplit([0.8, 0.2], seed=12345) # set seed to be 12345

# Step 5. Classifcation: Training with Gradient-boosted tree classifier


#### TargetBuy as the dependent variable and its a classification. The algorithm used is Gradient-boosted tree

In [None]:
# use Gradient-booted tree.
from pyspark.ml.classification import GBTClassifier
algo = GBTClassifier(labelCol = 'TargetBuy', featuresCol = 'features',  maxIter = 10 )
model = algo.fit(train)

# Step 6. Make predictions using the test dataset

In [None]:
# make prediction using test data
predictions = model.transform(test)
predictions.select(['TargetBuy','prediction', 'probability']).show()

+---------+----------+--------------------+
|TargetBuy|prediction|         probability|
+---------+----------+--------------------+
|        1|       0.0|[0.57371170405329...|
|        0|       0.0|[0.57371170405329...|
|        0|       0.0|[0.57371170405329...|
|        0|       0.0|[0.84879546962180...|
|        0|       0.0|[0.90568410810255...|
|        0|       0.0|[0.90568410810255...|
|        1|       0.0|[0.56675242663635...|
|        1|       0.0|[0.57371170405329...|
|        0|       0.0|[0.84879546962180...|
|        0|       0.0|[0.83775176638211...|
|        0|       0.0|[0.84879546962180...|
|        0|       0.0|[0.57371170405329...|
|        0|       0.0|[0.84879546962180...|
|        0|       0.0|[0.89610967935479...|
|        0|       0.0|[0.89610967935479...|
|        1|       0.0|[0.57371170405329...|
|        1|       0.0|[0.83775176638211...|
|        1|       1.0|[0.26436986468342...|
|        1|       1.0|[0.26436986468342...|
|        1|       1.0|[0.2559902

# Step 7. Model Evaluation

### In MLlib the default metric used for evaluating classification models is area_under_roc (auc).

In [None]:
# using spark ml to do model evaluation 
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='TargetBuy', metricName='areaUnderROC')
evaluator.evaluate(predictions)

0.824653603874722

In [None]:
y_true = predictions.select(['TargetBuy']).collect()
y_pred = predictions.select(['prediction']).collect()
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.83      0.94      0.88      3394
           1       0.69      0.41      0.52      1122

   micro avg       0.81      0.81      0.81      4516
   macro avg       0.76      0.68      0.70      4516
weighted avg       0.79      0.81      0.79      4516

