# Lab 3 - Spark MLlib

##### "A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P if its performance at tasks in T, as measured by P, improves with experience E"
-Tom M. Mitchell

#### Machine Learning - the science of getting computers to act without being explicitly programmed

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. It consists of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this example!), dimensionality reduction, as well as lower-level optimization primitives and higher-level pipeline APIs.

It divides into two packages:
- spark.mllib contains the original API built on top of RDDs.
- spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.


Using spark.ml is recommended because with DataFrames the API is more versatile and flexible. But we will keep supporting spark.mllib along with the development of spark.ml. Users should be comfortable using spark.mllib features and expect more features coming.

http://spark.apache.org/docs/latest/mllib-guide.html

## Online Purchase Recommendations

Learn how to create a recommendation engine using the Alternating Least Squares algorithm in Spark's machine learning library

<img src='https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Collaborative%20Filtering/ALS.png' width="70%" height="70%"></img>


Workflow:

<img src='https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Collaborative%20Filtering/als_flow.png' width="70%" height="70%"></img>


### The data

This is a transnational data set which contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered non-store online retail.  The company mainly sells unique all-occasion gifts. Many customers of the company are wholesalers.

http://archive.ics.uci.edu/ml/datasets/Online+Retail

<img src='https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Collaborative%20Filtering/FullFile.png' width="80%" height="80%"></img>

## Create an RDD from the csv data 

### Download the data

In [1]:
!wget https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Collaborative%20Filtering/OnlineRetail.csv.gz -N

--2016-06-28 16:13:45--  https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Collaborative%20Filtering/OnlineRetail.csv.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7483128 (7.1M) [application/octet-stream]
Last-modified header missing -- time-stamps turned off.
--2016-06-28 16:13:45--  https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Collaborative%20Filtering/OnlineRetail.csv.gz
Reusing existing connection to raw.githubusercontent.com:443.
HTTP request sent, awaiting response... 200 OK
Length: 7483128 (7.1M) [application/octet-stream]
Saving to: 'OnlineRetail.csv.gz'


2016-06-28 16:13:45 (40.7 MB/s) - 'OnlineRetail.csv.gz' saved [7483128/7483128]



### Put the csv into an RDD (at first, each row in the RDD is a string which correlates to a line in the csv)

In [2]:
loadRetailData = sc.textFile("OnlineRetail.csv.gz")

for row in loadRetailData.take(5):
    print row

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/10 8:26,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/10 8:26,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/10 8:26,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/10 8:26,3.39,17850,United Kingdom


## Prepare and shape the data:  "80% of a Data Scientists  job"


### First we will pull the CSV data into a format that is usable by
#### - removing the header
#### - splitting the rows
#### - removing data that is not relevant

#### Remove the header from the RDD and split the string in each row by comma

In [3]:
header = loadRetailData.first()
loadRetailData = loadRetailData.filter(lambda line: line != header).\
                            map(lambda l: l.split(","))

for row in loadRetailData.take(5):
    print row

[u'536365', u'85123A', u'WHITE HANGING HEART T-LIGHT HOLDER', u'6', u'12/1/10 8:26', u'2.55', u'17850', u'United Kingdom']
[u'536365', u'71053', u'WHITE METAL LANTERN', u'6', u'12/1/10 8:26', u'3.39', u'17850', u'United Kingdom']
[u'536365', u'84406B', u'CREAM CUPID HEARTS COAT HANGER', u'8', u'12/1/10 8:26', u'2.75', u'17850', u'United Kingdom']
[u'536365', u'84029G', u'KNITTED UNION FLAG HOT WATER BOTTLE', u'6', u'12/1/10 8:26', u'3.39', u'17850', u'United Kingdom']
[u'536365', u'84029E', u'RED WOOLLY HOTTIE WHITE HEART.', u'6', u'12/1/10 8:26', u'3.39', u'17850', u'United Kingdom']


##### NOTE:  The original file at UCI's Machine Learning Repository has commas in the product description.  Those have been removed to expediate the lab.
#### Only keep rows that have a purchase quantity of greater than 0, a customerID not equal to 0, and a non blank stock code after removing non-numeric characters.

In [4]:
import re
loadRetailData = loadRetailData.filter(lambda l: int(l[3]) > 0\
                                and len(re.sub("\D", "", l[1])) != 0 \
                                and len(l[6]) != 0)

#### Map each line to a row and create a data frame 

In [5]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

#Convert each line to a Row.
loadRetailData = loadRetailData.map(lambda l: Row(inv=int(l[0]),\
                                    stockCode=int(re.sub("\D", "", l[1])),\
                                    description=l[2],\
                                    quant=int(l[3]),\
                                    invDate=l[4],\
                                    price=float(l[5]),\
                                    custId=int(l[6]),\
                                    country=l[7]))

# Infer the schema, and register the DataFrame as a table.
retailDf = sqlContext.createDataFrame(loadRetailData)
print retailDf.printSchema()

retailDf.registerTempTable("retailPurchases")
sqlContext.sql("SELECT * FROM retailPurchases limit 2").toPandas()

root
 |-- country: string (nullable = true)
 |-- custId: long (nullable = true)
 |-- description: string (nullable = true)
 |-- inv: long (nullable = true)
 |-- invDate: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quant: long (nullable = true)
 |-- stockCode: long (nullable = true)

None


Unnamed: 0,country,custId,description,inv,invDate,price,quant,stockCode
0,United Kingdom,17850,WHITE HANGING HEART T-LIGHT HOLDER,536365,12/1/10 8:26,2.55,6,85123
1,United Kingdom,17850,WHITE METAL LANTERN,536365,12/1/10 8:26,3.39,6,71053


#### Keep only the data we need (custId, stockCode, and rank)

In [6]:
query = """
SELECT 
    custId, stockCode, 1 as purch
FROM 
    retailPurchases 
group 
    by custId, stockCode"""
retailDf = sqlContext.sql(query)
retailDf.registerTempTable("retailDf")

In [7]:
sqlContext.sql("select * from retailDf limit 10").toPandas()

Unnamed: 0,custId,stockCode,purch
0,12838,22941,1
1,17968,22731,1
2,16210,20977,1
3,17897,84558,1
4,16552,85123,1
5,17905,21662,1
6,13468,21231,1
7,16274,21809,1
8,13090,22617,1
9,16186,22865,1


### Randomly split the data into:
#### - testing set (10% of the data)
#### - cross validation set (10% of the data) 
#### - training set (80% of the data)

In [8]:
testDf, cvDf, trainDf = retailDf.randomSplit([.1,.1,.8],1)

print "trainDf count: ", trainDf.count(), " example: "
for row in trainDf.take(2): print row
print

print "cvDf count: ", cvDf.count(), " example: "
for row in cvDf.take(2): print row
print

print "testDf count: ", testDf.count(), " example: "
for row in testDf.take(2): print row
print

trainDf count:  208116  example: 
Row(custId=17968, stockCode=22731, purch=1)
Row(custId=17897, stockCode=84558, purch=1)

cvDf count:  25869  example: 
Row(custId=12838, stockCode=22941, purch=1)
Row(custId=13468, stockCode=21231, purch=1)

testDf count:  26127  example: 
Row(custId=16210, stockCode=20977, purch=1)
Row(custId=13090, stockCode=22617, purch=1)



## Build recommendation models

#### Use training DF to train a model with Alternating Least Squares 
Latent Factors / rank<br>
The number of columns in the user-feature and product-feature matricies)<br>
Iterations / maxIter<br>
The number of factorization runs<br>

In [9]:
from pyspark.ml.recommendation import ALS

arguments = {}
arguments["rank"] = 15
arguments["maxIter"] = 15
arguments["userCol"] = "custId"
arguments["itemCol"] = "stockCode"
arguments["ratingCol"] = "purch"
arguments["implicitPrefs"] = True

als1 = ALS(**arguments)
model1 = als1.fit(trainDf)

print "The model has been trained"

The model has been trained


## Test the model

Use the models to predict what the user will rate a certain item.  

The closer our prediction of a customer purchasing a product is to 1 the better fit the model is.

For example:<br>
<b>Customer A</b> purchased stockCode item <b>20831</b>, if we have confidence of .9999 that the customer would purchase that product we are very accurate.

#### Evaluate the model with the cross validation dataframe by using the transform function.

Some of the users or purchases in the cross validation data may not have been in the training data.  
Let's remove the ones that are not, this makes it easier to test the accuracy of the model.

In [10]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import BooleanType
counter = 4

customers = set(trainDf.map(lambda line: line.custId).collect())
print "Customer Examples"
for i, x in enumerate(customers): 
    print x
    if i == counter : break

print

stock = set(trainDf.map(lambda line: line.stockCode).collect())
print "Stock Examples"
for i, x in enumerate(stock): 
    print x
    if i == counter : break

Customer Examples
16384
16385
16386
16387
16389

Stock Examples
2
90116
21846
90118
90119


In [11]:
#filter out customers and stock codes that will be actionable (customer exists, item in stock)
print "Pre-Filter: ", cvDf.count()
cvDf = cvDf.rdd.filter(lambda line: line.stockCode in stock and\
                                           line.custId in customers).toDF()
print "Post-Filter: ", cvDf.count()

Pre-Filter:  25869
Post-Filter:  25844


In [12]:
#Build Predictions, this will return every Customer, Item and Prediciton of their Purchase of the combination

predictions1 = model1.transform(cvDf)
for row in predictions1.take(5):
    print row

Row(custId=14286, stockCode=20831, purch=1, prediction=0.0679328441619873)
Row(custId=13949, stockCode=20831, purch=1, prediction=0.06638696044683456)
Row(custId=14730, stockCode=21031, purch=1, prediction=0.020719466730952263)
Row(custId=13038, stockCode=21231, purch=1, prediction=0.151198610663414)
Row(custId=15855, stockCode=21231, purch=1, prediction=0.048063118010759354)


# Implement the model

Use the model to predict items the user will be interested in.

First, create a dataframe in which each row has the user id and an item id.

In [13]:
from pyspark.sql.functions import lit

stock15544 = set(trainDf.filter(trainDf['custId'] == 15544).map(lambda line: line.stockCode).collect())

userItems = trainDf.select("stockCode").distinct().\
            withColumn('custId', lit(15544)).\
            rdd.filter(lambda line: line.stockCode not in stock15544).toDF()

for row in userItems.take(5):
    print row.stockCode, row.custId

21231 15544
85231 15544
22431 15544
23231 15544
22831 15544


Use 'transform' to rate the prediction of purchase for each product for this specific customer

In [14]:
userItems = model1.transform(userItems)

for row in userItems.take(5):
    print row.stockCode, row.custId, row.prediction

20831 15544 0.00470375129953
21031 15544 0.0102273710072
21231 15544 0.0580250695348
21631 15544 -0.0257182512432
22031 15544 0.0572351813316


Print the top 5 recommendations.

In [15]:
userItems.registerTempTable("predictions")
query = "select * from predictions order by prediction desc limit 5"

sqlContext.sql(query).toPandas()

Unnamed: 0,stockCode,custId,prediction
0,84997,15544,0.597668
1,21213,15544,0.578849
2,16161,15544,0.518497
3,22138,15544,0.505846
4,22090,15544,0.491485


Build a product Dataframe

In [16]:
stockItems = sqlContext.sql("select distinct stockCode, description from retailPurchases")
stockItems.registerTempTable("stockItems")

This user seems to have purchased a lot of childrens gifts and some holiday items.  The recomendation engine we created suggested some items along these lines

##### The ALS algorithm uses some randomness, so the recommendations yours produces may be different than these.

In [17]:
#show recomended items
query = """
select 
    predictions.*,
    stockItems.description
from
    predictions
inner join stockItems on
    predictions.stockCode = stockItems.stockCode
order by predictions.prediction desc
limit 10
"""
sqlContext.sql(query).toPandas()

Unnamed: 0,stockCode,custId,prediction,description
0,84997,15544,0.597668,GREEN 3 PIECE POLKADOT CUTLERY SET
1,84997,15544,0.597668,CHILDRENS CUTLERY RETROSPOT RED
2,84997,15544,0.597668,BLUE 3 PIECE POLKADOT CUTLERY SET
3,84997,15544,0.597668,CHILDRENS CUTLERY POLKADOT PINK
4,84997,15544,0.597668,RED 3 PIECE RETROSPOT CUTLERY SET
5,84997,15544,0.597668,PINK 3 PIECE POLKADOT CUTLERY SET
6,84997,15544,0.597668,CHILDRENS CUTLERY POLKADOT BLUE
7,84997,15544,0.597668,CHILDRENS CUTLERY POLKADOT GREEN
8,21213,15544,0.578849,PACK OF 72 SKULL CAKE CASES
9,16161,15544,0.518497,WRAP ENGLISH ROSE


### Now we can refine the model and test for better accuracy

By changing the hyperparameters we can refine the model and test it for accuracy.<br><br>
There are two hyperparameters we will change:<br>
-<b>rank</b> is the number of latent factors in the model.<br>
-<b>iterations</b> is the number of iterations to run.<br>

In [18]:
als1 = ALS(rank=3, maxIter=15,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model1 = als1.fit(trainDf)
predictions1 = model1.transform(cvDf)

als2 = ALS(rank=15, maxIter=3,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model2 = als2.fit(trainDf)
predictions2 = model2.transform(cvDf)

als3 = ALS(rank=15, maxIter=15,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model3 = als3.fit(trainDf)
predictions3 = model3.transform(cvDf)

Now we can incorporate our cross validation data and determine how close of a match we have from the model

predictions1 = model1.transform(cvDf)
predictions2 = model2.transform(cvDf)
predictions3 = model3.transform(cvDf)

Now we will use Mean Squared Error to determine the accuracy. This is done by comparing the prection value to the actual purchase value in our data in cvDF.
<br><br>
Per Wikipedia: In statistics, the mean squared error (MSE) or mean squared deviation (MSD) of an estimator measures the average of the squares of the errors or deviations, that is, the difference between the estimator and what is estimated.
<br><br>
What we are looking for here is the lowest number as compared to the others. A perfect match, being 0, is not really possible but lower is better.

In [19]:
meanSquaredError1 = predictions1.map(lambda line: (line.purch - line.prediction)**2).mean()
meanSquaredError2 = predictions2.map(lambda line: (line.purch - line.prediction)**2).mean()
meanSquaredError3 = predictions3.map(lambda line: (line.purch - line.prediction)**2).mean()
    
print 'Mean squared error = %.4f for our first model' % meanSquaredError1
print 'Mean squared error = %.4f for our second model' % meanSquaredError2
print 'Mean squared error = %.4f for our third model' % meanSquaredError3

Mean squared error = 0.7388 for our first model
Mean squared error = 0.7003 for our second model
Mean squared error = 0.6691 for our third model


Once we have determined the best model, the third model in this case. We can compare that to the testDF and again take the Mean Squared Error.
<br><br>
The purpose of this is to make sure we are not over matched to the cvDF data. It could be that we match one subset of our date well and not another.
The importance here is to look for a close match on the MSE value between csDF and testDF

In [20]:
filteredTestDf = testDf.rdd.filter(lambda line: line.stockCode in stock and\
                                              line.custId in customers).toDF()
predictions4 = model3.transform(filteredTestDf)
meanSquaredError4 = predictions4.map(lambda line: (line.purch - line.prediction)**2).mean()


print 'Mean squared error = %.4f for our third model using cvDF' % meanSquaredError3
print 'Mean squared error = %.4f for our third (and best) model using testDF' % meanSquaredError4

Mean squared error = 0.6691 for our third model using cvDF
Mean squared error = 0.6677 for our third (and best) model using testDF


##### Data Citation
Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197â€“208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17).