# Introduction to Apache Spark lab, part 3: machine learning

In this notebook, you'll learn how to create a model for purchase recommendations using the alternating least squares algorithm of the Apache Spark machine learning library. Machine learning is based on algorithms that can learn from data without relying on rules-based programming.  

"A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P if its performance at tasks in T, as measured by P, improves with experience E"
-Tom M. Mitchell

This notebook uses pySpark, the Python API for Spark. Some knowledge of Python is recommended. This notebook runs on Python 2 with Spark 1.6 and Spark 2.0.

If you are new to Apache Spark, see the first two parts of this lab: 
 - Introduction to Apache Spark lab, part 1: Basic concepts. (need link when published)
 - Introduction to Apache Spark lab, part 2: Querying data. (need link when published) 

## Spark machine learning library
[Apache Spark’s machine learning library](http://spark.apache.org/docs/latest/mllib-guide.html) makes practical machine learning scalable and easy. The library consists of common machine learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this notebook!), dimensionality reduction, lower-level optimization primitives, and higher-level pipeline APIs.

The library has two packages:
- spark.mllib contains the original API that handles data in RDDs. It's in maintenance mode, but fully supported.
- spark.ml contains a newer API for constructing ML pipelines. It handles data in DataFrames. It's being actively enhanced.

## Alternating least squares algorithm
The alternating least squares (ALS) algorithm provides collaborative filtering between customers and products to find products that the customers might like, based on their previous purchases or ratings.

The ALS algorithm creates a matrix of all customers versus all products. Most cells in the matrix are empty, which means the customer hasn't bought that product. The ALS algorithm then fills in the probability of customers buying products that they haven't bought yet, based on the similarities between customer purchases and the similarities between products. The algorithm uses the least squares computation to minimize the estimation errors and alternates between fixing the customer factors and solving for product factors and fixing the product factors and solving for customer factors.

You don't, however, need to understand how the ALS algorithm works to use it! Spark machine learning algorithms have default values that work well in most cases.

## Table of contents

1. [Get the data](#getdata)<br>
2. [Prepare and shape the data](#prepare)<br>
    2.1 [Format the data](#prepare1)<br>
    2.2 [Clean the data](#prepare2)<br>
    2.3 [Create a DataFrame](#prepare3)<br>
    2.4 [Remove unneeded columns](#prepare4)<br>
3. [Split the data into three sets](#split)<br>
4. [Build recommendation models](#model)<br>
5. [Test the models](#test)<br>
    5.1 [Clean the cross validation data set](#test1)<br>
    5.2 [Run the models on the cross validation data set](#test2)<br>
    5.3 [Calculate the accuracy for each model](#test3)<br>
    5.4 [Confirm the best model](#test4)<br>
6. [Implement the mode](#implement)<br>
    6.1 [Create a DataFrame for the customer and all products](#implement1)<br>
    6.2 [Rate each product](#implement2)<br>
    6.3 [Find the top recommendations](#implement3)<br>
    6.4 [Compare purchased and recommended products](#implement4)<br>
7. [Summary and next steps](#summary)

<a id="getdata"></a>
## 1. Get the data 
The data set contains the transactions of an online retailer of gift items for 01/12/2010 - 09/12/2011. Many of the customers are wholesalers.

You'll be using a slightly modified version of this data set: [http://archive.ics.uci.edu/ml/datasets/Online+Retail](http://archive.ics.uci.edu/ml/datasets/Online+Retail).  

Here's a glimpse of the data:

<img src='https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/FullFile.png' width="80%" height="80%"></img>

Download the CSV version of the data set, from which the commas in the product descriptions are removed:

In [None]:
!rm 'OnlineRetail.csv.gz' -f
!wget https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/OnlineRetail.csv.gz

Put the data into an RDD and print the first 5 rows:

In [None]:
loadRetailData = sc.textFile("OnlineRetail.csv.gz")

for row in loadRetailData.take(5):
    print row

Each row in the RDD is a string that correlates to a line in the CSV file.

<a id="prepare"></a>
## 2. Prepare and shape the data

It's been said that preparing and shaping data is 80% of a data scientist's job. Having the right data in the right format is critical for getting accurate results.

To get the data ready, complete these tasks:

1. [Format the data](#prepare1)
1. [Clean the data](#prepare2)
1. [Create a DataFrame](#prepare3)
1. [Remove unneeded columns](#prepare4)

<a id="prepare1"></a>
### 2.1 Format the data
Remove the header from the RDD and split the string in each row with a comma:

In [None]:
header = loadRetailData.first()
loadRetailData = loadRetailData.filter(lambda line: line != header).\
                            map(lambda l: l.split(","))

for row in loadRetailData.take(5):
    print row

<a id="prepare2"></a>
### 2.2 Clean the data
Remove the rows that have incomplete data. Keep only the rows that meet the following criteria:
 - The purchase quantity is greater than 0 
 - The customerID not equal to 0 
 - The stock code is not blank after you remove non-numeric characters

In [None]:
import re

loadRetailData = loadRetailData.filter(lambda l: int(l[3]) > 0\
                                and len(re.sub("\D", "", l[1])) != 0 \
                                and len(l[6]) != 0)

print loadRetailData.take(2)

<a id="prepare3"></a>
### 2.3 Create a DataFrame

First, create an SQLContext and map each line to a row: 

In [None]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

#Convert each line to a Row.
loadRetailData = loadRetailData.map(lambda l: Row(inv=int(l[0]),\
                                    stockCode=int(re.sub("\D", "", l[1])),\
                                    description=l[2],\
                                    quant=int(l[3]),\
                                    invDate=l[4],\
                                    price=float(l[5]),\
                                    custId=int(l[6]),\
                                    country=l[7]))

Create a DataFrame and show the inferred schema:

In [None]:
retailDf = sqlContext.createDataFrame(loadRetailData)
print retailDf.printSchema()

Register the DataFrame as a table so that you can run SQL queries on it and show the first two rows:

In [None]:
retailDf.registerTempTable("retailPurchases")
sqlContext.sql("SELECT * FROM retailPurchases limit 2").toPandas()

<a id="prepare4"></a>
### 2.4 Remove unneeded columns
The only columns you need are `custId`, `stockCode`, and a new column, `purch`, which has a value of 1 to indicate that the customer purchased the product:

In [None]:
query = """
SELECT 
    custId, stockCode, 1 as purch
FROM 
    retailPurchases 
group 
    by custId, stockCode"""
retailDf = sqlContext.sql(query)
retailDf.registerTempTable("retailDf")

sqlContext.sql("select * from retailDf limit 10").toPandas()

<a id="split"></a>
## 3. Split the data into three sets
You'll split the data into three sets: 
 - a testing data set (10% of the data)
 - a cross validation data set (10% of the data)
 - a training data set (80% of the data)

Split the data randomly and create a DataFrame for each data set:

In [None]:
testDf, cvDf, trainDf = retailDf.randomSplit([.1,.1,.8],1)

print "trainDf count: ", trainDf.count(), " example: "
for row in trainDf.take(2): print row
print

print "cvDf count: ", cvDf.count(), " example: "
for row in cvDf.take(2): print row
print

print "testDf count: ", testDf.count(), " example: "
for row in testDf.take(2): print row
print

<a id="model"></a>
## 4. Build recommendation models
Machine learning algorithms have standard parameters and hyperparameters. Standard parameters specify data and options. Hyperparameters control the performance of the algorithm.

The ALS algorithm has these hyperparameters:  

 - The `rank` hyperparameter represents the number of features. The default value of `rank` is 10.
 - The `maxIter` hyperparameter represents the number of iterations to run the least squares computation. The default value of `maxIter` is 10.

Use the training DataFrame to train three models with the ALS algorithm with different values for the `rank` and `maxIter` hyperparameters. Assign the `userCol`, `itemCol`, and `ratingCol` parameters to the appropriate data columns. Set the `implicitPrefs` parameter to `true` so that the algorithm can predict latent factors.

In [None]:
from pyspark.ml.recommendation import ALS

als1 = ALS(rank=3, maxIter=15,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model1 = als1.fit(trainDf)

als2 = ALS(rank=15, maxIter=3,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model2 = als2.fit(trainDf)

als3 = ALS(rank=15, maxIter=15,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model3 = als3.fit(trainDf)

print "The models are trained"

<a id="test"></a>
## 5. Test the models

First test the three models on the cross validation data set and then on the testing data set. 

You'll know the model is accurate when the prediction values for products that the customers have already bought is close to 1. 

<a id="test1"></a>
### 5.1 Clean the cross validation data set

Remove any of the customers or products in the cross validation set that are not in the training set:

In [None]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import BooleanType
customers = set(trainDf.rdd.map(lambda line: line.custId).collect())
stock = set(trainDf.rdd.map(lambda line: line.stockCode).collect())

print cvDf.count()
cvDf = cvDf.rdd.filter(lambda line: line.stockCode in stock and\
                                           line.custId in customers).toDF()
print cvDf.count()

<a id="test2"></a>
### 5.2 Run the models on the cross validation data set
Run the model with the cross validation DataFrame by using the `transform` function and print the first two rows of each set of predictions:

In [None]:
predictions1 = model1.transform(cvDf)
predictions2 = model2.transform(cvDf)
predictions3 = model3.transform(cvDf)

print predictions1.take(2)
print predictions2.take(2)
print predictions3.take(2)

<a id="test3"></a>
### 5.3 Calculate the accuracy for each model  

You'll use the mean squared error calculation to determine accuracy by comparing the prediction values for products to the actual purchase values. Remember that if a customer purchased a product, the value in the `purch` column is 1. The mean squared error calculation measures the average of the squares of the errors between what is estimated and the existing data. The lower the mean squared error value, the more accurate the model. 

For all predictions, subtract the prediction from the actual purchase value (1), square the result, and calculate the mean of all of the squared differences:

In [None]:
meanSquaredError1 = predictions1.rdd.map(lambda line: (line.purch - line.prediction)**2).mean()
meanSquaredError2 = predictions2.rdd.map(lambda line: (line.purch - line.prediction)**2).mean()
meanSquaredError3 = predictions3.rdd.map(lambda line: (line.purch - line.prediction)**2).mean()
    
print 'Mean squared error = %.4f for our first model' % meanSquaredError1
print 'Mean squared error = %.4f for our second model' % meanSquaredError2
print 'Mean squared error = %.4f for our third model' % meanSquaredError3

The third model (model3) has the lowest mean squared error value, so it's the most accurate.

Notice that of the three models, model3 has the highest values for the hyperparameters. At this point you might be tempted to run the model with even higher values for `rank` and `maxIter`. However, you might not get better results. Increasing the values of the hyperparameters increases the time for the model to run. Also, you don't want to overfit the model so that it exactly fits the original data. In that case, you don't get any recommendations! For best results, keep the values of the hyperparameters close to the defaults.

<a id="test4"></a>
### 5.4 Confirm the best model 

Now run model3 on the testing data set to confirm that it's the best model. You want to make sure that the model is not over-matched to the cross validation data. It's possible for a model to match one subset of the data well but not another. If the values of the mean squared error for the testing data set and the cross validation data set are close, then you've confirmed that the model works for all the data.

Clean the testing data set, run model3 on the testing data set, and calculate the mean squared error:

In [None]:
filteredTestDf = testDf.rdd.filter(lambda line: line.stockCode in stock and\
                                              line.custId in customers).toDF()
predictions4 = model3.transform(filteredTestDf)
meanSquaredError4 = predictions4.rdd.map(lambda line: (line.purch - line.prediction)**2).mean()
    
print 'Mean squared error = %.4f for our best model' % meanSquaredError4

That's pretty close. The model works for all the data.

<a id="implement"></a>
## 6. Implement the model

Use the best model to predict which products that a particular customer might be interested in.

<a id="implement1"></a>
### 6.1 Create a DataFrame for the customer and all products 

Create a DataFrame in which each row has the customer ID (15544) and a product ID:

In [None]:
from pyspark.sql.functions import lit

stock15544 = set(trainDf.filter(trainDf['custId'] == 15544).rdd.map(lambda line: line.stockCode).collect())

userItems = trainDf.select("stockCode").distinct().\
            withColumn('custId', lit(15544)).\
            rdd.filter(lambda line: line.stockCode not in stock15544).toDF()

for row in userItems.take(5):
    print row.stockCode, row.custId

<a id="implement2"></a>
### 6.2 Rate each product

Run the `transform` function to create a prediction value for each product:

In [None]:
userItems = model3.transform(userItems)

for row in userItems.take(5):
    print row.stockCode, row.custId, row.prediction

<a id="implement3"></a>
### 6.3 Find the top recommendations

Print the top 5 product recommendations:

In [None]:
userItems.registerTempTable("predictions")
query = "select * from predictions order by prediction desc limit 5"

sqlContext.sql(query).toPandas()

<a id="implement4"></a>
### 6.4 Compare purchased and recommended products

Here's a view of the products that customer 15544 bought:

<img src='https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/user.png' width="80%" height="80%"></img>

This customer bought lots of childrens gifts and some holiday items. 

Look at the descriptions of the recommended products to see if they are in the same categories.

<div class="alert alert-block alert-info">Note: The ALS algorithm uses some randomness, so the recommendations you got might be different than these.</div>

In [None]:
stockItems = sqlContext.sql("select distinct stockCode, description from retailPurchases")
stockItems.registerTempTable("stockItems")

query = """
select 
    predictions.*,
    stockItems.description
from
    predictions
inner join stockItems on
    predictions.stockCode = stockItems.stockCode
order by predictions.prediction desc
limit 10
"""
sqlContext.sql(query).toPandas()

The recommended products look pretty similar to the purchased products, and, in some cases, are actually the same. Your model works!

<a id="summary"></a>
## 7. Summary and next steps
You created a predictive model that makes product recommendations for customers and verified that it works.

Dig deeper:
 - [Collaborative Filtering](http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html)
 - [Spark Machine Learning Library (MLlib) Guide](http://spark.apache.org/docs/latest/ml-guide.html)
 - [Spark Python API Docs](http://spark.apache.org/docs/latest/api/python/index.html)


## Author


## Data citation
Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197â€“208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17).