# INFS 774 Assignment 5: Machine Learning Library (MLlib)

Now you should have known that Spark has three different data structures available through its APIs: RDD, Dataframe (that is similar to but different from Pandas or R dataframe), and Dataset (In spark 2.0, Dataframe and Dataset APIs are unified. However, Dataset is only avaible for Spark Scala or Java API and is not available for Python). 

Spark Machine Learning Library includes two major packages for doing machine learning: SparkML and SparkMLLib. SparkMLLib is used with RDDs, while SparkML supports Dataframes. In this assignment, you need to use the SparkML package rather than SparkMLLib and store your datasets as spark Dataframes. 

# Case description

A supermarket is offering a new line of organic products. The supermarket's management wants to determine which customers are likely to purchase these products.
The supermarket has a customer loyalty program. As an initial buyer incentive plan, the supermarket provided coupons for the organic products to all of the loyalty program participants and collected data that includes whether these customers purchased any of the organic products.
The ORGANICS data set contains 13 variables and 22222 observations. 

# Variable Description
The variables in the data set are shown below with the appropriate roles and levels:
![Organics description](organics.png)
The variables in this dataset has been classified into two categories: interval (i.e., real-valued) vs. nominal (i.e., categorical). Please note in this dataset, you have two dependent variables: TargetBuy (a binary variable) and TargetAmt (an interval variable). You need to deal with the classification task.

# The overall goal of this assignment is to do a classification. The dependent variable is "TargetBuy". We want to predict whether a customer will buy organic food or not.

#### Exercises will include an explanation of what is expected, followed by code cells where one cell will have one or more `<FILL IN>` sections.  The cell that needs to be modified will have `# TODO: Replace <FILL IN> with appropriate code`.  

# Step 1: Load Data

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv('organics.csv', inferSchema=True, header=True) #<FILL IN> # read csv data, set inferSchema to be true.
data.show()

+-----+-------+------+----------+---------------+---------+----------+------------+---------+---------+--------+---------+---------+
|   ID|DemAffl|DemAge|DemCluster|DemClusterGroup|DemGender|    DemReg|    DemTVReg|PromClass|PromSpend|PromTime|TargetBuy|TargetAmt|
+-----+-------+------+----------+---------------+---------+----------+------------+---------+---------+--------+---------+---------+
|  140|     10|    76|        16|              C|        U|  Midlands|Wales & West|     Gold|  16000.0|       4|        0|        0|
|  620|      4|    49|        35|              D|        U|  Midlands|Wales & West|     Gold|   6000.0|       5|        0|        0|
|  868|      5|    70|        27|              D|        F|  Midlands|Wales & West|   Silver|     0.02|       8|        1|        1|
| 1120|     10|    65|        51|              F|        M|  Midlands|    Midlands|      Tin|     0.01|       7|        1|        1|
| 2313|     11|    68|         4|              A|        F|  Midlands

#### The ID column is not needed for machine learning. The variable DemCluster is a nominal variable that includes too many categories, and we will remove this variable. We also want to remove the dependent variable "TargetAmt" we will not use. In the cell below, please write code to 1) filter out the unneeded columns including 'TargetAmt', 'ID' and 'DemCluster', and 2) change the datatype of the interval independent variables including 'DemAffl', 'DemAge', and 'PromTime' from interger into double for convenient data processing 

In [2]:
# step 1. drop two columns ID, DemCluster, and TargetAmt
data = data.drop('DemCluster', 'TargetAmt') # please use the method drop to drop the columns.
data.printSchema()
data.head(3) # this is an action. Head() returns a list of rows.
from pyspark.sql.types import DoubleType
# step 2. change the data types of  'DemAffl', 'DemAge', and 'PromTime' to double. Please refer to https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark to learn how to change the datatype of a variable
for variable in ['DemAffl', 'DemAge','PromTime']:
    data = data.withColumn(variable, data[variable].cast(DoubleType()))
    

# print schema again to verify if the data type conversion works. 
data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- DemAffl: integer (nullable = true)
 |-- DemAge: integer (nullable = true)
 |-- DemClusterGroup: string (nullable = true)
 |-- DemGender: string (nullable = true)
 |-- DemReg: string (nullable = true)
 |-- DemTVReg: string (nullable = true)
 |-- PromClass: string (nullable = true)
 |-- PromSpend: double (nullable = true)
 |-- PromTime: integer (nullable = true)
 |-- TargetBuy: integer (nullable = true)

root
 |-- ID: integer (nullable = true)
 |-- DemAffl: double (nullable = true)
 |-- DemAge: double (nullable = true)
 |-- DemClusterGroup: string (nullable = true)
 |-- DemGender: string (nullable = true)
 |-- DemReg: string (nullable = true)
 |-- DemTVReg: string (nullable = true)
 |-- PromClass: string (nullable = true)
 |-- PromSpend: double (nullable = true)
 |-- PromTime: double (nullable = true)
 |-- TargetBuy: integer (nullable = true)



# Step 2. Data Exploration

#### In the cell below, write code to use the describe function to show basic stats only for the interval variables in the dataset.

In [3]:
interval_variables = ['DemAffl','DemAge', 'PromSpend','PromTime']
data.describe(interval_variables).show() 

+-------+-----------------+------------------+-----------------+-----------------+
|summary|          DemAffl|            DemAge|        PromSpend|         PromTime|
+-------+-----------------+------------------+-----------------+-----------------+
|  count|            21137|             20713|            22222|            21941|
|   mean|8.711832331929791|53.796890841500506|4420.248964089997|6.564605077252632|
| stddev|3.421194092206597|13.205780845120188|7559.046597013129|4.657208722596065|
|    min|              0.0|              18.0|             0.01|              0.0|
|    max|             34.0|              79.0|        296313.85|             39.0|
+-------+-----------------+------------------+-----------------+-----------------+



#### In the box below, please define a list that contains the names of the interval variables that have missing values:

In [4]:
from pyspark.sql import functions as f
#data.len().show()

interval_with_missing = ['DemAffl','DemAge','PromTime']

#### In the box below, please write code to show the value count of each nominal(categorical) variable including 'TargetBuy'. 

In [5]:
nominal_with_dependent = ['DemClusterGroup', 'DemGender', 'DemReg', 'DemTVReg', 'PromClass', 'TargetBuy']
for column in nominal_with_dependent:
    data.groupBy(column).count().show()
    

+---------------+-----+
|DemClusterGroup|count|
+---------------+-----+
|              F| 3949|
|           null|  674|
|              E| 2607|
|              B| 4144|
|              U|   54|
|              D| 4378|
|              C| 4566|
|              A| 1850|
+---------------+-----+

+---------+-----+
|DemGender|count|
+---------+-----+
|        F|12148|
|     null| 2513|
|        M| 5815|
|        U| 1746|
+---------+-----+

+----------+-----+
|    DemReg|count|
+----------+-----+
|  Scottish| 1368|
|      null|  466|
|South East| 8634|
|South West|  691|
|  Midlands| 6740|
|     North| 4323|
+----------+-----+

+------------+-----+
|    DemTVReg|count|
+------------+-----+
|      Ulster|  266|
|      N East|  785|
|      Border|  203|
|      S West|  691|
|        null|  465|
|      London| 6189|
|   Yorkshire| 1443|
|        East| 1649|
|      N Scot|  329|
|      N West| 2096|
|  C Scotland|  836|
|    Midlands| 3122|
|Wales & West| 1703|
|  S & S East| 2445|
+------------+----

#### In the box below, please define a list that contains the names of the nominal variables that have missing values:

In [6]:
nominal_with_missing = ['DemClusterGroup', 'DemGender', 'DemReg', 'DemTVReg']


# Step 3. Feature Transformation

## 3.1. Missing value imputation for interval variables

### If a continuous variable contains missing values, we do missing value imputation in two steps: 1) We add a missing value indicator; and 2) we replace missing values with the mean

In [7]:
from pyspark.sql import functions as f
from pyspark.ml.feature import Imputer
for variable in interval_with_missing:
    # step 1. add a missing indicator: 
    indicator_name = variable+"Missing" # The variable name of the missing value indictor 
    data = data.withColumn(indicator_name, f.when(f.isnull(data[variable]), 1).otherwise(0))
    # do value count to verify.
    data.groupBy(indicator_name).count().show()
    # step 2: replace missing values with the mean
    imputed_name = variable + "Imputed" # The variable name of the imputed variable
    imputer = Imputer(strategy='mean', inputCols=[variable], outputCols=[imputed_name])
    imputer_model =imputer.fit(data)
    data = imputer_model.transform(data)
    # run the describe to check whether the missing values has been imputed. The count of the imputed variables should be 22222.
    data.describe(imputed_name).show()

+--------------+-----+
|DemAfflMissing|count|
+--------------+-----+
|             1| 1085|
|             0|21137|
+--------------+-----+

+-------+------------------+
|summary|    DemAfflImputed|
+-------+------------------+
|  count|             22222|
|   mean| 8.711832331929804|
| stddev|3.3366243422702384|
|    min|               0.0|
|    max|              34.0|
+-------+------------------+

+-------------+-----+
|DemAgeMissing|count|
+-------------+-----+
|            1| 1509|
|            0|20713|
+-------------+-----+

+-------+-----------------+
|summary|    DemAgeImputed|
+-------+-----------------+
|  count|            22222|
|   mean|53.79689084149959|
| stddev|12.74950444653274|
|    min|             18.0|
|    max|             79.0|
+-------+-----------------+

+---------------+-----+
|PromTimeMissing|count|
+---------------+-----+
|              1|  281|
|              0|21941|
+---------------+-----+

+-------+-----------------+
|summary|  PromTimeImputed|
+-------+---

## 3.2. Missing value imputation for categorical variables

### We replace missing values with the string 'unknown'; however, in our dataset, the DemGender column contains "M", "F", "U", and null. We believe that "U" means unknown. So we want to replace missing values with 'U' for this variable.

In [8]:
for variable in nominal_with_missing:
    # if variable is DemGender, we replace the missing values in the column with 'U'
    if variable == 'DemGender':
        # the method fillna can be used to replace missing values with a new value. 
        data = data.fillna({variable:'U'})
    # for the other variable, we replace the missing values in the column with 'unknown'
    else:
        data = data.fillna({variable:'unknown'})
    # run value_count to verify:
    data.groupBy(variable).count().show()

+---------------+-----+
|DemClusterGroup|count|
+---------------+-----+
|              F| 3949|
|        unknown|  674|
|              E| 2607|
|              B| 4144|
|              U|   54|
|              D| 4378|
|              C| 4566|
|              A| 1850|
+---------------+-----+

+---------+-----+
|DemGender|count|
+---------+-----+
|        F|12148|
|        M| 5815|
|        U| 4259|
+---------+-----+

+----------+-----+
|    DemReg|count|
+----------+-----+
|  Scottish| 1368|
|   unknown|  466|
|South East| 8634|
|South West|  691|
|  Midlands| 6740|
|     North| 4323|
+----------+-----+

+------------+-----+
|    DemTVReg|count|
+------------+-----+
|      Ulster|  266|
|      N East|  785|
|      Border|  203|
|     unknown|  465|
|      S West|  691|
|      London| 6189|
|   Yorkshire| 1443|
|        East| 1649|
|      N Scot|  329|
|      N West| 2096|
|  C Scotland|  836|
|    Midlands| 3122|
|Wales & West| 1703|
|  S & S East| 2445|
+------------+-----+



## 3.3. Categorical variable encoding:
#### We learned that machine learning algorithms cannot deal with categorical features (i.e., features with strings). So, you need to write code to convert the categorical variables with strings into nominal variables. 

In [9]:
from pyspark.ml.feature import StringIndexer
nominal_with_string = ['DemClusterGroup', 'DemGender', 'DemReg', 'DemTVReg', 'PromClass']
for variable in nominal_with_string:
    indexed_variable = variable+"Indexed" # The variable name of the indexed/encoded variable
    indexer = StringIndexer(inputCol=variable, outputCol=indexed_variable)
    indexer_model = indexer.fit(data)
    data = indexer_model.transform(data)
data.select('DemClusterGroupIndexed', 'DemGenderIndexed', 'DemRegIndexed', 'DemTVRegIndexed', 'PromClassIndexed').show()

+----------------------+----------------+-------------+---------------+----------------+
|DemClusterGroupIndexed|DemGenderIndexed|DemRegIndexed|DemTVRegIndexed|PromClassIndexed|
+----------------------+----------------+-------------+---------------+----------------+
|                   0.0|             2.0|          1.0|            4.0|             2.0|
|                   1.0|             2.0|          1.0|            4.0|             2.0|
|                   1.0|             0.0|          1.0|            4.0|             0.0|
|                   3.0|             1.0|          1.0|            1.0|             1.0|
|                   5.0|             0.0|          1.0|            1.0|             1.0|
|                   1.0|             2.0|          2.0|            3.0|             3.0|
|                   5.0|             0.0|          1.0|            5.0|             1.0|
|                   1.0|             1.0|          2.0|            8.0|             1.0|
|                   3

# 3.4. Dummy Coding:
#### Now you have converted categorical variables with strings into nominal variables with indexes. You still need to deal with nominal variables with more than 2 categories - You need to do dummy coding!

In [10]:


from pyspark.ml.feature import OneHotEncoderEstimator
# a list of nominal variables that needs to be dummy coded.
indexed_nominal_variables = ['DemClusterGroupIndexed', 'DemGenderIndexed', 'DemRegIndexed', 'DemTVRegIndexed', 'PromClassIndexed']
for variable in indexed_nominal_variables:
    dummy_variable = variable+"Vec"
    encoder = OneHotEncoderEstimator(inputCols=[variable],
                                 outputCols=[dummy_variable])
    model = encoder.fit(data)
    data = model.transform(data)
data.select('DemClusterGroupIndexedVec', 'DemGenderIndexedVec', 'DemRegIndexedVec', 'DemTVRegIndexedVec', 'PromClassIndexedVec').show()

+-------------------------+-------------------+----------------+------------------+-------------------+
|DemClusterGroupIndexedVec|DemGenderIndexedVec|DemRegIndexedVec|DemTVRegIndexedVec|PromClassIndexedVec|
+-------------------------+-------------------+----------------+------------------+-------------------+
|            (7,[0],[1.0])|          (2,[],[])|   (5,[1],[1.0])|    (13,[4],[1.0])|      (3,[2],[1.0])|
|            (7,[1],[1.0])|          (2,[],[])|   (5,[1],[1.0])|    (13,[4],[1.0])|      (3,[2],[1.0])|
|            (7,[1],[1.0])|      (2,[0],[1.0])|   (5,[1],[1.0])|    (13,[4],[1.0])|      (3,[0],[1.0])|
|            (7,[3],[1.0])|      (2,[1],[1.0])|   (5,[1],[1.0])|    (13,[1],[1.0])|      (3,[1],[1.0])|
|            (7,[5],[1.0])|      (2,[0],[1.0])|   (5,[1],[1.0])|    (13,[1],[1.0])|      (3,[1],[1.0])|
|            (7,[1],[1.0])|          (2,[],[])|   (5,[2],[1.0])|    (13,[3],[1.0])|          (3,[],[])|
|            (7,[5],[1.0])|      (2,[0],[1.0])|   (5,[1],[1.0])|

In [11]:
# now let's take a look at the schema of data again
data.printSchema()
# print all the variable in the dataframe
print(data.columns)

root
 |-- ID: integer (nullable = true)
 |-- DemAffl: double (nullable = true)
 |-- DemAge: double (nullable = true)
 |-- DemClusterGroup: string (nullable = false)
 |-- DemGender: string (nullable = false)
 |-- DemReg: string (nullable = false)
 |-- DemTVReg: string (nullable = false)
 |-- PromClass: string (nullable = true)
 |-- PromSpend: double (nullable = true)
 |-- PromTime: double (nullable = true)
 |-- TargetBuy: integer (nullable = true)
 |-- DemAfflMissing: integer (nullable = false)
 |-- DemAfflImputed: double (nullable = true)
 |-- DemAgeMissing: integer (nullable = false)
 |-- DemAgeImputed: double (nullable = true)
 |-- PromTimeMissing: integer (nullable = false)
 |-- PromTimeImputed: double (nullable = true)
 |-- DemClusterGroupIndexed: double (nullable = false)
 |-- DemGenderIndexed: double (nullable = false)
 |-- DemRegIndexed: double (nullable = false)
 |-- DemTVRegIndexed: double (nullable = false)
 |-- PromClassIndexed: double (nullable = false)
 |-- DemClusterGroup

# 3.5 Construct the Features Vector

#### Now let's create a list that includes all the independent variable we need to use in the machine learing.

In [12]:
# this independent variable list needs to include: 
# 1. For the nominal/categorical variable, you only need to include dummy variables you created for the nominal variable. You don't need to include the original and the indexed nominal variables.
# 2. For the interval variables, you need to include:
#    a. Interval variables that do not have missing values.
#    b. For the interval variables that have missing values, you only need to include the imputed variables and the missing value indicators. E.g., for 'DemAffl', you need to include both DemAfflMissing and DemAfflImputed. You don't want to include the original variables (e.g., DemAffl)
# Please remember, you also don't want to include the dependent variables TargetBuy in the independent variable list.

independent_variables = ['PromSpend','DemAfflMissing', 'DemAfflImputed', 'DemAgeMissing', 
                         'DemAgeImputed', 'PromTimeMissing', 'PromTimeImputed',
                         'DemClusterGroupIndexedVec', 'DemGenderIndexedVec', 'DemRegIndexedVec', 
                         'DemTVRegIndexedVec', 'PromClassIndexedVec']

#### MLlib expects data to be represented in two columns: a features vector and a label column. Now please write code to prepare the features vector using assembler.

In [13]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=independent_variables, outputCol='features')
data = assembler.transform(data)

data.head(2)



[Row(ID=140, DemAffl=10.0, DemAge=76.0, DemClusterGroup='C', DemGender='U', DemReg='Midlands', DemTVReg='Wales & West', PromClass='Gold', PromSpend=16000.0, PromTime=4.0, TargetBuy=0, DemAfflMissing=0, DemAfflImputed=10.0, DemAgeMissing=0, DemAgeImputed=76.0, PromTimeMissing=0, PromTimeImputed=4.0, DemClusterGroupIndexed=0.0, DemGenderIndexed=2.0, DemRegIndexed=1.0, DemTVRegIndexed=4.0, PromClassIndexed=2.0, DemClusterGroupIndexedVec=SparseVector(7, {0: 1.0}), DemGenderIndexedVec=SparseVector(2, {}), DemRegIndexedVec=SparseVector(5, {1: 1.0}), DemTVRegIndexedVec=SparseVector(13, {4: 1.0}), PromClassIndexedVec=SparseVector(3, {2: 1.0}), features=SparseVector(37, {0: 16000.0, 2: 10.0, 4: 76.0, 6: 4.0, 7: 1.0, 17: 1.0, 25: 1.0, 36: 1.0})),
 Row(ID=620, DemAffl=4.0, DemAge=49.0, DemClusterGroup='D', DemGender='U', DemReg='Midlands', DemTVReg='Wales & West', PromClass='Gold', PromSpend=6000.0, PromTime=5.0, TargetBuy=0, DemAfflMissing=0, DemAfflImputed=4.0, DemAgeMissing=0, DemAgeImputed=49

# Step 4. Training vs. test dataset partitioning

#### Write code to split your data into a training (70%) and a test dataset (30%). 

In [14]:
train, test = data.randomSplit([0.7, 0.3], seed=12345)

# Step 5. Classifcation: Training with Gradient-boosted tree classifier


####  Let's use TargetBuy as the dependent variable and do a classification. The algorithm we use is Gradient-boosted tree. Please refer to https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier for details.

In [15]:
# here we use Gradient-booted tree. The d
from pyspark.ml.classification import GBTClassifier
algo = GBTClassifier(featuresCol='features', labelCol='TargetBuy', maxIter=10) # set maxIter=10 for GBTClassifier 
model = algo.fit(train)

# Step 6. Make predictions using the test dataset

In [16]:
# make prediction using test data
predictions = model.transform(test)
predictions.select(['TargetBuy','prediction', 'probability']).show()

+---------+----------+--------------------+
|TargetBuy|prediction|         probability|
+---------+----------+--------------------+
|        0|       0.0|[0.71628243970119...|
|        0|       0.0|[0.90967713245875...|
|        0|       0.0|[0.76752926399651...|
|        0|       0.0|[0.86080296901729...|
|        0|       0.0|[0.87221748814530...|
|        0|       0.0|[0.87370078175685...|
|        1|       1.0|[0.44258662592787...|
|        0|       0.0|[0.75079342331208...|
|        0|       0.0|[0.90688853030195...|
|        0|       0.0|[0.86182132521572...|
|        1|       0.0|[0.75821051963979...|
|        0|       0.0|[0.84544236313838...|
|        1|       1.0|[0.31877412258060...|
|        0|       0.0|[0.91030208179284...|
|        1|       1.0|[0.19866174342649...|
|        0|       0.0|[0.87221748814530...|
|        0|       0.0|[0.92331143898198...|
|        0|       0.0|[0.75079342331208...|
|        0|       0.0|[0.83986498350339...|
|        0|       0.0|[0.9248159

# Step 7. Model Evaluation

### In MLlib the default metric used for evaluating classification models is area_under_roc (auc). Please write the code to get the AUC value

In [17]:
# using spark ml to do model evaluation 
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='TargetBuy', metricName='areaUnderROC')
evaluator.evaluate(predictions)


0.8263046324908051

### Model Evaluation with SciKit-Learn
### Next, please write code to use scikit-learn to create the classification report.

In [18]:
y_true = predictions.select(['TargetBuy']).collect()
y_pred = predictions.select(['prediction']).collect()
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred))



              precision    recall  f1-score   support

           0       0.84      0.93      0.88      5117
           1       0.68      0.44      0.53      1616

   micro avg       0.81      0.81      0.81      6733
   macro avg       0.76      0.69      0.71      6733
weighted avg       0.80      0.81      0.80      6733

