# Lab 3 - Machine Learning with Spark

##### "A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P if its performance at tasks in T, as measured by P, improves with experience E"
-Tom M. Mitchell

#### Machine Learning - the science of getting computers to act without being explicitly programmed

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. It consists of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this example!), dimensionality reduction, as well as lower-level optimization primitives and higher-level pipeline APIs.

It divides into two packages:
- spark.mllib contains the original API built on top of RDDs.
- spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.


Using spark.ml is recommended because with DataFrames the API is more versatile and flexible. But we will keep supporting spark.mllib along with the development of spark.ml. Users should be comfortable using spark.mllib features and expect more features coming.

http://spark.apache.org/docs/latest/mllib-guide.html

## Online Purchase Recommendations

Learn how to create a recommendation engine using the Alternating Least Squares algorithm in Spark's machine learning library

<img src='https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/ALS.png' width="70%" height="70%"></img>

### The data

This is a transnational data set which contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered non-store online retail.  The company mainly sells unique all-occasion gifts. Many customers of the company are wholesalers.

http://archive.ics.uci.edu/ml/datasets/Online+Retail

<img src='https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/FullFile.png' width="80%" height="80%"></img>

# <span style="color:blue">Section 0: </span> <br>Obtaining the dataset and performing initial prep work

### Download the data

In [4]:
!rm 'OnlineRetail.csv.gz' -f
!wget https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/OnlineRetail.csv.gz

--2016-07-14 19:32:55--  https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/OnlineRetail.csv.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7483128 (7.1M) [application/octet-stream]
Saving to: 'OnlineRetail.csv.gz'


2016-07-14 19:32:56 (44.8 MB/s) - 'OnlineRetail.csv.gz' saved [7483128/7483128]



### Load the csv into an RDD (at first, each row in the RDD is a string which corresponds to a line in the csv)

In [6]:
loadRetailData = sc.textFile("./OnlineRetail.csv.gz")
print loadRetailData.take(2)

[u'InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country', u'536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/10 8:26,2.55,17850,United Kingdom']


## Prepare and shape the data:  "80% of a Data Scientists  job"

#### Remove the header from the RDD and split the string in each row by comma

In [7]:
header = loadRetailData.first()
loadRetailData = loadRetailData.filter(lambda line: line != header).\
                            map(lambda l: l.split(","))

print loadRetailData.take(2)

[[u'536365', u'85123A', u'WHITE HANGING HEART T-LIGHT HOLDER', u'6', u'12/1/10 8:26', u'2.55', u'17850', u'United Kingdom'], [u'536365', u'71053', u'WHITE METAL LANTERN', u'6', u'12/1/10 8:26', u'3.39', u'17850', u'United Kingdom']]


##### NOTE:  The original file at UCI's Machine Learning Repository has commas in the product description.  Those have been removed to speed up the lab.
#### Only keep rows that have a purchase quantity of greater than 0, a customerID not equal to 0, and a non blank stock code after removing non-numeric characters

In [8]:
import re

loadRetailData = loadRetailData.filter(lambda l: int(l[3]) > 0\
                                and len(re.sub("\D", "", l[1])) != 0 \
                                and len(l[6]) != 0)

print loadRetailData.take(2)

[[u'536365', u'85123A', u'WHITE HANGING HEART T-LIGHT HOLDER', u'6', u'12/1/10 8:26', u'2.55', u'17850', u'United Kingdom'], [u'536365', u'71053', u'WHITE METAL LANTERN', u'6', u'12/1/10 8:26', u'3.39', u'17850', u'United Kingdom']]


#### Map each line to a row and create a data frame 

In [9]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

#Convert each line to a Row.
loadRetailData = loadRetailData.map(lambda l: Row(inv=int(l[0]),\
                                    stockCode=int(re.sub("\D", "", l[1])),\
                                    description=l[2],\
                                    quant=int(l[3]),\
                                    invDate=l[4],\
                                    price=float(l[5]),\
                                    custId=int(l[6]),\
                                    country=l[7]))

# Infer the schema, and register the DataFrame as a table.
retailDf = sqlContext.createDataFrame(loadRetailData)
print retailDf.printSchema()

retailDf.registerTempTable("retailPurchases")
print sqlContext.sql("SELECT * FROM retailPurchases limit 2").toPandas()

root
 |-- country: string (nullable = true)
 |-- custId: long (nullable = true)
 |-- description: string (nullable = true)
 |-- inv: long (nullable = true)
 |-- invDate: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quant: long (nullable = true)
 |-- stockCode: long (nullable = true)

None
          country  custId                         description     inv  \
0  United Kingdom   17850  WHITE HANGING HEART T-LIGHT HOLDER  536365   
1  United Kingdom   17850                 WHITE METAL LANTERN  536365   

        invDate  price  quant  stockCode  
0  12/1/10 8:26   2.55      6      85123  
1  12/1/10 8:26   3.39      6      71053  


### DATA EXTRACTION FOR THE RECOMMENDATION ENGINE
#### Extract from the reatailDf dataframe the columns which are needed for the recommendation engine: custId, stockCode and the customer's preference for a product, which is not available explicitly, so it will be "approximated" by the number of times a customer buys a product (<span style="color:blue">quantity</span>).
#### Note as well that the recommendation engine expects an integer datatype for both the customer id and the stock code, and a double value for the customer ranking or preference. The columns are therefore casted to the proper datatype as part of the SQL statement.

In [10]:
query = """
SELECT 
    cast(custId as integer), cast(stockCode as integer), cast(count(*) as double) as preference
FROM 
    retailPurchases 
group 
    by custId, stockCode"""
retailDf = sqlContext.sql(query)

retailDf.show(10)

+------+---------+----------+
|custId|stockCode|preference|
+------+---------+----------+
| 12838|    22941|       1.0|
| 17968|    22731|       1.0|
| 16210|    20977|       1.0|
| 17897|    84558|       1.0|
| 16552|    85123|       1.0|
| 17905|    21662|       1.0|
| 13468|    21231|       2.0|
| 16274|    21809|       2.0|
| 13090|    22617|       9.0|
| 16186|    22865|       4.0|
+------+---------+----------+
only showing top 10 rows



### Randomly split the data into a testing set (20% of the data), and a training set (80% of the data)

In [11]:
testDf, trainDf = retailDf.randomSplit([.2,.8],1)

print testDf.take(2)
print trainDf.take(2)

[Row(custId=12838, stockCode=22941, preference=1.0), Row(custId=16210, stockCode=20977, preference=1.0)]
[Row(custId=17968, stockCode=22731, preference=1.0), Row(custId=17897, stockCode=84558, preference=1.0)]


# End of <span style="color:blue">Section 0 </span>.

### In the rest of this tutorial, we will build different recommendation engines, following an identical approach but using a few different options / alternatives offered by the Apache Spark libraries MLlib and Spark ML. 
### Examples of the alternatives which will be explored include using the original RDD based Spark MLlib library, versus the more recent (dataframe based) Spark ML, as well as comparing results when using the option which differentiates between EXPLICIT customer feedback (usually provided by rating a product), versus the more common IMPLICIT customer feedback (usually derived from customer behavior).

# <span style="color:red">Build recommendation models. We will have 4 sections:</span> 
## <span style="color:blue">Section 1-</span> Use the MLlib library assuming EXPLICIT customer feedback
## <span style="color:blue">Section 2-</span> Use the MLlib library assuming IMPLICIT customer feedback
## <span style="color:blue">Section 3-</span> Use the Spark ML library assuming EXPLICIT customer feedback
## <span style="color:blue">Section 4-</span> Use the Spark ML library assuming IMPLICIT customer feedback

#### Use training DF to train a model with Alternating Least Squares 
Latent Factors / rank<br>
The number of columns in the user-feature and product-feature matricies)<br>
Iterations / maxIter<br>
The number of factorization runs<br>

# <span style="color:blue">Section 1 (Please ensure you've run Section 0 above)</span>
### Use MLlib library and (assume) EXPLICIT user feedback on product ratings.

### <span style="color:green">Section 1.1: Training the explicit MLlib model</span>

In [12]:
# Some imports
from pyspark.mllib.recommendation import ALS, Rating

# Convert the trainDf dataframe (defined above) to the underlying RDD. It should be noted that 
# these conversions have a cost... We will test shortly in a subsequent paragrapah whether the 
# MLlib algorithm can ingest directly the dataframe, which would be more convenient since data 
# frames can be queried using SQL or the Domain Specific Langue which is SQL-like.
trainRDD = trainDf.rdd

# We are casting below the elements of the RDD to "Rating" objects, corresponding to the
# input which is expected by the Spark ALS algorithm. The definition of this ALS 
# Rating class can be found fairly easily online and looks as follows:  
# Rating(user: Int, product: Int, rating: Double)
trainRDDAsRating = trainRDD.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# As it will also become apparent using the RDD below, the explicit casting of the ALS input 
# elements to Rating objects is not required, and keeping the original triplets with the 
# proper data types appears to be sufficient as well.
trainRDDNoRating = trainRDD.map(lambda l: (int(l[0]), int(l[1]), float(l[2])))

In [13]:
# Take a peek at the first elements of the RDD made of "Rating" objects. As expected, the
# Rating class is made of the three fields mentioned above: user (integer), product (integer) 
# and rating (double)
trainRDDAsRating.first()

Rating(user=17968, product=22731, rating=1.0)

In [14]:
# Also take a look at the first element of the RDD made from basic triplets.
trainRDDNoRating.first()

(17968, 22731, 1.0)

In [15]:
# Prepare to train the model, using a basic choice of hyper parameters for the ALS algorithm.
rank = 5
numIterations = 15
alpha = 0.01
lambda1 = 0.01

# Train the model using the Rating class
model = ALS.train(trainRDD, rank, numIterations, 0.01, -1, False, 10)

# Train the model using the basic triplets instead of the "Rating" class. It can be verified
# that this works in the same way and produces the same model. We can replace "model" by 
# "modelNoRating" in the cells below which will not cause any difference in the results
modelNoRating = ALS.train(trainRDDNoRating, rank, numIterations, 0.01, -1, False, 10)

print "The model has been trained"

The model has been trained


### <span style="color:green">Section 1.2: Building the test RDD</span>

#### Now that the ALS model has been trained, it needs to be tested for accuracy. Remember that the original dataset was split into a training and testing set. We will use the testing set for this purpose. The testing set has three fields: user, product, rating and we need to eliminate the third field (column) in order to produce an RDD of the format (user, product), which is done in the cell below. This RDD will be used by the prediction logic to produce a new RDD which has the prediction/recommendation column added to the original two.

In [16]:
# The testing set was obtained as a subset of the overall dataset, so it contains as well
# three fields, namely: user, product, rating.
testRDD = testDf.rdd

# testRDD0 is shaped to keep the original three fields: user, product, rating where the
# user and product are grouped into a tuple acting as the key in a (key, value) pair... The
# value being the rating. This RDD is going to be needed, once we get predictions from the
# model, to compare original user ratings with the ones produced by the model, in order to
# calculate its accuracy.
testRDD0 = testRDD.map(lambda Row: ((Row[0], Row[1]), Row[2]))

# testRDD1 below corresponds to the testRDD where the original user rating is eliminated. This
# RDD will be used as input for generating predictions
testRDD1 = testRDD.map(lambda Row: (Row[0], Row[1]))

# Verify the proper formatting of testRDD0 by taking a look at the first element(s)
testRDD0.take(2)

[((12838, 22941), 1.0), ((16210, 20977), 1.0)]

### <span style="color:green">Section 1.3: Getting predictions using the explicit MLlib model</span>

In [17]:
predict = model.predictAll(testRDD1)

In [18]:
# The RDD predict above now has the model generated ratings added to it. The closer those
# ratings are to the original user ratings we had in the testRDD, the more accurate the model
# will be.
predict.take(5)

[Rating(user=17883, product=21000, rating=0.7812946517353289),
 Rating(user=13700, product=22900, rating=1.9396991247054494),
 Rating(user=15201, product=22900, rating=-1.3326617632247206),
 Rating(user=16504, product=22900, rating=1.2932833403922677),
 Rating(user=18005, product=22900, rating=0.889831456012337)]

### <span style="color:green">Section 1.4: Comparing user entries with predicted ratings and getting a Mean Squared Error</span>

In [19]:
# The next task will consist in joining this new RDD of predictions with the original testRDD
# in order to compare original and generated ratings. We will therefore shape this new RDD
# as a (key, value) pair RDD where the key will correspond to a tuple consisting of the same
# two fields (user, product) as in the testRDD. This is achieved with the transformation below.
predict = predict.map(lambda r: ((r.user, r.product), r.rating))
predict.first()

((17883, 21000), 0.7812946517353289)

In [20]:
# We can now proceed with the joining of both RDDs, on the (user, product) field.
ratesAndPreds = predict.join(testRDD0)
ratesAndPreds.first()

((16201, 20717), (1.455879040687113, 1.0))

In [21]:
# The resulting RDD, named ratesAndPreds, has therefore the pair (user, product) as its key,
# and the original and generated ratings as the values. We can therefore calculate the average
# of the difference between the two values as follows. (Mean Squared Error)
MSE = ratesAndPreds.map(lambda l: (l[1][0]- l[1][1])**2).mean()
print MSE

2.67109648124


## An MSE value by itself is not a meaningful number as it needs to be compared with other values in an iterative process in order to identify the best hyper parameters for the ALS algorithm in this particular scenario. This can be left as an exercise...

### We can now perform the same steps performed above, but trying to manipulate dataframes whenever possible rather than RDDs, since joining dataframes can be more easily done using SQL syntax.

### <span style="color:green">Section 1.5: Training explicit MLlib model, working with dataframes</span>

In [22]:
# Prepare and test a model where we pass the training dataframe directly, containing the three
# required fields for ALS: user, product, rating

from pyspark.mllib.recommendation import ALS, Rating
rank = 5
numIterations = 15
alpha = 0.01
lambda1 = 0.01

modeldf = ALS.train(trainDf, rank, numIterations, 0.01, -1, False, 10)

print "The model has been trained"

The model has been trained


In [23]:
# As performed in previous steps above, we will now extract the (user, product) pair from the 
# testDf and drop the rating column, since it gets added by the prediction logic.
testDf2 = testDf.map(lambda l: (l[0], l[1])).toDF(["user", "product"])
testDf2.first()

Row(user=12838, product=22941)

In [24]:
# Get now the predictions for testDf2. Note that we need to pass in the underlying RDD to
# predictAll. Passing in the DataFrame returns an error indicating that the method expects
# an RDD.
# Performance enhancement: It is not necessary (except for educational purposes in a tutorial)
# to make testDf2 a dataframe since predictAll below requires an RDD. Consequently, we can 
# probably avoid converting the result of the lambda transformation back to a dataframe...
predictUsingDf = modeldf.predictAll(testDf2.rdd)

#predictUsingDf is an RDD of "Rating" objects comprised of the fields: user, product, rating

In [25]:
# Take a look at the first element of predictUsingDf.
predictUsingDf.first()

Rating(user=17883, product=21000, rating=0.7812946517353289)

In [26]:
# Now we will rebuild a data frame from the RDD predictUsingDf, so as to be able to use SQL to
# compare the original ratings with the ones generated by the recommendation engine.
predictDf = predictUsingDf.map(lambda r: (r.user, r.product, r.rating)).\
map(lambda (a,b,c): Row(a,b,c)).toDF(["user", "product", "rating"])

In [27]:
# Verify that the dataframe was built correctly by looking up a few elements.
predictDf.show(5)

+-----+-------+-------------------+
| user|product|             rating|
+-----+-------+-------------------+
|17883|  21000| 0.7812946517353289|
|13700|  22900| 1.9396991247054494|
|15201|  22900|-1.3326617632247206|
|16504|  22900| 1.2932833403922677|
|18005|  22900|  0.889831456012337|
+-----+-------+-------------------+
only showing top 5 rows



### We will now proceed with the same approach used above, where the ratings generated by the recommendation engine and those originally available in the dataset are brought together in the same structure for comparison. The difference with the previous method is that we are going to use an SQL join instead of the RDD join, which is more convenient and also (potentially) more efficient.

In [28]:
# This dataframe contains the original rating value. Register it as a table.
testDf.registerTempTable("testDf")

# This dataframe contains the generated ratings. Register it as a table.
predictDf.registerTempTable("predictDf")

# Join both tables.
join = """
SELECT 
    custId, stockCode, preference, user, product, rating
FROM 
    testDf, predictDf 
WHERE
    custId = user and stockCode = product"""
joinDf = sqlContext.sql(join)

#Print a few rows from the join result
print joinDf.filter("preference >=3").orderBy("custId", "preference").show(10)

+------+---------+----------+-----+-------+------------------+
|custId|stockCode|preference| user|product|            rating|
+------+---------+----------+-----+-------+------------------+
| 12347|    84991|       3.0|12347|  84991|2.1611118780423353|
| 12347|    84997|       3.0|12347|  84997| 6.430063588583876|
| 12347|    47559|       3.0|12347|  47559|1.6197763913083667|
| 12347|    22376|       3.0|12347|  22376|1.7577461522484379|
| 12347|    21791|       3.0|12347|  21791|1.9615753117557606|
| 12347|    22492|       3.0|12347|  22492|2.2913006447658004|
| 12347|    20719|       4.0|12347|  20719| 2.659215206985238|
| 12347|    84558|       5.0|12347|  84558|1.7344275232546358|
| 12352|    22779|       3.0|12352|  22779|2.1602474136825043|
| 12359|    82613|       3.0|12359|  82613|1.7254139125447852|
+------+---------+----------+-----+-------+------------------+
only showing top 10 rows

None


### Calculating the Mean Squared Error, and unsurprisingly, it is exactly the same value as previously calculated, since the same hyper parameters were used to train the model, and the usage of data frames versus RDDs does not affect the model which is produced by Spark.

In [29]:
# Compute the Mean Squared Error. Note that we can refer to the columns by name instead of by
# index...
MSE = joinDf.map(lambda l: (l.preference - l.rating)**2).mean()
print MSE

2.67109648124


### <span style="color:green">Section 1.6: Manual verification of ALS recommendations for one chosen customer</span>

### We will now work on verifying "manually" the efficiency of our model. In order to do so, we will pick one customer who bought a reasonably small, yet meaningful number of items (i.e more than one or two, but not dozens) and check the top three recommendations for that particular customer in order to decide whether these make sense or not. To begin, we write a SQL query to find customers ids who bought a reasonable number of items, i.e between 5 and 10. We start with a count of 7 items (feel free to update the query below with a different count for other attemps)

In [30]:
# The order by clause in the query below has been added in case the having clause is changed
# to an inequality rather than an equality (i.e something like having count > 5 ...)
query = """select custId, count(*) as count from retailPurchases 
           group by custId having count = 7 order by count"""

sqlContext.sql(query).take(10)

[Row(custId=17639, count=7),
 Row(custId=18048, count=7),
 Row(custId=16248, count=7),
 Row(custId=15257, count=7),
 Row(custId=15463, count=7),
 Row(custId=18269, count=7),
 Row(custId=13876, count=7),
 Row(custId=12884, count=7),
 Row(custId=14494, count=7),
 Row(custId=14500, count=7)]

### From the results of the SQL query above, we arbitrarily pick one of the customers ids and inject it in the query below in order to find out what items this person bought.

In [31]:
sqlContext.sql("""SELECT distinct stockCode, description from retailPurchases 
               where custId in (16248)""").take(25)

[Row(stockCode=22809, description=u'SET OF 6 T-LIGHTS SANTA'),
 Row(stockCode=23157, description=u'SET OF 6 NATIVITY MAGNETS '),
 Row(stockCode=22865, description=u'HAND WARMER OWL DESIGN'),
 Row(stockCode=23419, description=u'HOME SWEET HOME BOTTLE '),
 Row(stockCode=22866, description=u'HAND WARMER SCOTTY DOG DESIGN'),
 Row(stockCode=23155, description=u'KNICKERBOCKERGLORY MAGNET ASSORTED '),
 Row(stockCode=23156, description=u'SET OF 5 MINI GROCERY MAGNETS')]

### Get the top 3 recommendations for the user chosen above

In [32]:
modeldf.recommendProducts(16248,3)

[Rating(user=16248, product=2, rating=4.293959165449962),
 Rating(user=16248, product=15056, rating=3.0547204883131953),
 Rating(user=16248, product=17012, rating=2.84352985063404)]

#### Use the SQL query below to quickly lookup the text description of the recommendations above. It can be noticed that some items use multiple text descriptions for the same stockCode (<span style="color:red">possible further data cleaning opportunity here...</span>), which is the reason why we select several rows from the resulting dataframe. 
#### <span style="color:green">There is some randomness injected in the recommendations, so it might not be possible to get the exact same results for consecutive runs...</span>

In [33]:
sqlContext.sql("""SELECT distinct stockCode, description from retailPurchases 
               where stockCode in (17012, 15056)""").take(35)

[Row(stockCode=15056, description=u'EDWARDIAN PARASOL PINK'),
 Row(stockCode=17012, description=u'ORIGAMI LAVENDER INCENSE/CANDL SET '),
 Row(stockCode=17012, description=u'ORIGAMI JASMINE INCENSE/CANDLE SET'),
 Row(stockCode=17012, description=u'ORIGAMI VANILLA INCENSE/CANDLE SET '),
 Row(stockCode=17012, description=u'ORIGAMI OPIUM INCENSE/CANDLE SET '),
 Row(stockCode=15056, description=u'EDWARDIAN PARASOL BLACK'),
 Row(stockCode=15056, description=u'EDWARDIAN PARASOL NATURAL'),
 Row(stockCode=17012, description=u'ORIGAMI ROSE INCENSE/CANDLE SET '),
 Row(stockCode=17012, description=u'ORIGAMI SANDLEWOOD INCENSE/CAND SET')]

### <span style="color:green">Section 1.7: Analyzing the ALS recommendation engine for data (customer ids) which did not exist in the training set...</span>

#### Extra-credit activity: Remember that the original data set was randomly split between training and testing set (80% , 20%). This means that some users may be present in the testing set but not in the training set. Finding those users can be achieved in several different ways. Below is an example

In [34]:
#Find users which were in the testing set but not in the training set
trainDf.registerTempTable("trainDf")
testDf.registerTempTable("testDf")
query = """
SELECT testDf.custId, trainDf.custId FROM
testDf LEFT OUTER JOIN trainDf
ON testDf.custId = trainDf.custId
where isNull(trainDf.custId)
"""
joinDf = sqlContext.sql(query)

joinDf.show(20)

+------+------+
|custId|custId|
+------+------+
| 15442|  null|
| 16462|  null|
| 12665|  null|
| 12665|  null|
| 15269|  null|
| 15269|  null|
| 17896|  null|
| 17896|  null|
| 15098|  null|
| 15100|  null|
| 14705|  null|
| 13307|  null|
| 15510|  null|
| 13120|  null|
| 13135|  null|
| 15940|  null|
| 16144|  null|
| 12346|  null|
| 16148|  null|
| 17948|  null|
+------+------+
only showing top 20 rows



### Let's pick one of the customer ids which was present in the testing set but not in the training set and get the top 3 recommendations. 
### The execution of the cell below should return an error due to the fact that the model never obtained data for a user which was not present in the training set. Consequently, it makes sense to cleanup the testing set by removing all users and products which were not present in the training set...

In [12]:
modeldf.recommendProducts(16462,3)




### We can verify this in a different way, by comparing the number of entries which were fed into the model and the number of entries it produced. Those were respectively the RDDs / dataframes <span style="color:red">testDf2</span> and <span style="color:red">predictUsingDf</span> higher up in Section 1.5
### It can be noticed that some entries did not produce a recommendation and those correspond to customer ids which the trained model does not know about (also known as the <span style="color:red">cold start</span> problem: what to recommend for a new customer who has never provided any ratings or preferences).
### It can be noticed as well that the remarks made for customer ids can also be made for stock ids, and a best practice would be to ensure that the randomly generated test set is similar in distributions to the training set.

In [35]:
testDf2.count()

51996

In [36]:
predictUsingDf.count()

51953

### MLlib also provides a recommendFeatures API, which will return the top users for a given product <span style="color:red">Hand Warmer Owl Design</span>. Let's try it.

In [37]:
modeldf.recommendUsers(22865,3)

[Rating(user=16422, product=22865, rating=12.731184377356648),
 Rating(user=17841, product=22865, rating=10.958900034487785),
 Rating(user=13777, product=22865, rating=8.051914800487737)]

### This is an SQL query in which we can inject the customer ids to identify the products which were bought. A further analysis of the results ise left as an exercise...

In [42]:
sqlContext.sql("""SELECT distinct stockCode, description from retailPurchases 
                  where custId in (17841)""").take(35)

[Row(stockCode=22654, description=u'DELUXE SEWING KIT '),
 Row(stockCode=22942, description=u'CHRISTMAS LIGHTS 10 SANTAS '),
 Row(stockCode=23190, description=u'BUNDLE OF 3 SCHOOL EXERCISE BOOKS  '),
 Row(stockCode=21390, description=u'FILIGRIS HEART WITH BUTTERFLY'),
 Row(stockCode=22118, description=u'JOY WOODEN BLOCK LETTERS'),
 Row(stockCode=22970, description=u'LONDON BUS COFFEE MUG'),
 Row(stockCode=72780, description=u'BLACK SILOUETTE CANDLE PLATE'),
 Row(stockCode=21462, description=u'NURSERY ABC PAINTED LETTERS'),
 Row(stockCode=21269, description=u'ANTIQUE CREAM CUTLERY SHELF '),
 Row(stockCode=22695, description=u'WICKER WREATH SMALL'),
 Row(stockCode=22339, description=u'CHRISTMAS TREE PAINTED ZINC '),
 Row(stockCode=85064, description=u'CREAM SWEETHEART LETTER RACK'),
 Row(stockCode=22996, description=u'TRAVEL CARD WALLET VINTAGE TICKET'),
 Row(stockCode=84596, description=u'SMALL DOLLY MIX DESIGN ORANGE BOWL'),
 Row(stockCode=21902, description=u'KEY FOB  FRONT  DOOR '),


### <span style="color:green">Section 1.8: A Quick look under the hood..</span>

### MLlib gives access to the user and product features matrices mentioned at the beginning of this lab. The ratings for each user and product are obtained as a result of the dot product of the user Vector and Product vector from these matrices.

### Here is a row from the userFeatures matrix. We have selected the features for the user we have been working with so far: <span style="color:red">16248</span>. Notice the number of Features "5" which corresponds to the <span style="color:red">rank</span> parameters which was used to train the algorithm.

In [64]:
model.userFeatures().lookup(16248)

[array('d', [0.282953679561615, -0.9585980772972107, 0.1316331923007965, 0.20625106990337372, -0.09942080825567245])]

### Here is another view, taking the first two elements of the <span style="color:red">Features</span> matrix. We see the features associated with two products.

In [65]:
model.productFeatures().take(2)

[(20700,
  array('d', [0.6529210805892944, -0.6889617443084717, 0.08246684074401855, 0.1900530457496643, -0.14447906613349915])),
 (21000,
  array('d', [1.2690746784210205, -0.13172782957553864, 0.2237655222415924, 0.6966758370399475, -0.593940258026123]))]

## <span style="color:green">Additional work will be suggested at the end of this lab in manipulating these user and product features in more detail...</span>

# <span style="color:blue">Section 2 (Please ensure you've run Section 0 above)</span>
### Use MLlib library and (assume) IMPLICIT user feedback on product ratings.
### We will continue using data frames for convenience.

### In implicit feedback rating, the ALS algorithm will adapt its internal weights to treat the provided user ratings as being derived through some process rather than being directly given by the user. This is closer to reality in our example, since the number of items purchased by the user were used as a form of representing the user preference for that item.<br> <br>The general flow of the approach will however remain identical to the one in Section 1 above.

### <span style="color:green">Section 2.1: Training the implicit MLlib model.</span>

In [8]:
# Working with Implicit Feedback (We will treat the number of times a customer bought an item
# as implicit feedback by using trainImplicit)
from pyspark.mllib.recommendation import ALS, Rating
rank = 5
numIterations = 10
alpha = 0.01
lambda1 = 0.01
modelImplicit1 = ALS.trainImplicit(trainDf, rank, numIterations, alpha, -1, lambda1, False, 10)
print "modelImplicit1 has been trained"

modelImplicit1 has been trained


### A second model with different hyper parameter values is trained for comparison purposes.

In [9]:
rank = 15
numIterations = 20
alpha = 0.01
lambda1 = 0.01
modelImplicit2 = ALS.trainImplicit(trainDf, rank, numIterations, alpha, -1, lambda1, False, 10)
print "modelImplicit2 has been trained"

modelImplicit2 has been trained


### <span style="color:green">Section 2.2: In a similar way to what was done higher in Section 1, we build a test dataframe which only has two columns: user and product. The rating column is added by the recommendation engine.</span>

In [10]:
#Extract the (user, product) pair from the testDf and drop the rating
testDf2 = testDf.map(lambda l: (l[0], l[1])).toDF(["user", "product"])
testDf2.first()

Row(user=12838, product=22941)

### <span style="color:green">Section 2.3: Getting predictions using the implicit MLlib model</span>

In [11]:
# Get now the predictions for testDf2. Note that we need to pass in the underlying RDD to
# predictAll. Passing in the DataFrame returns an error indicating that the method expects
# an RDD.
predictImplicit = modelImplicit1.predictAll(testDf2.rdd)

#predictUsingDf is an RDD of "Rating" objects comprised of the fields: user, product, rating

In [12]:
predictImplicit.first()

Rating(user=17883, product=21000, rating=0.017978272291746304)

### Note as well that in the case of implict ratings logic, the ratings returned by the recommendation engine are values between 0 and 1, as they correspond to a "confidence" level rather than explicit rating value.

In [13]:
# Now we will rebuild a data frame from the RDD predictUsingDf, so as to be able to use SQL.
predictImplicitDf = predictImplicit.map(lambda r: (r.user, r.product, r.rating)).\
map(lambda (a,b,c): Row(a,b,c)).toDF(["user", "product", "rating"])

In [14]:
predictImplicitDf.show(5)

+-----+-------+--------------------+
| user|product|              rating|
+-----+-------+--------------------+
|17883|  21000|0.017978272291746304|
|13700|  22900| 0.10150778559611573|
|15201|  22900|0.018166055084698145|
|16504|  22900|  0.0383290330738306|
|18005|  22900| 0.03260654905695093|
+-----+-------+--------------------+
only showing top 5 rows



### <span style="color:green">Section 2.4: Comparing user entries with predicted ratings and getting a Mean Squared Error</span>

### In the cell below, we will join the test table with the recommendation engine predictions into one single table / dataframe, which will allow us to calculate the accuracy of the algorithm by computing the deltas between original and computed user ratings.

In [15]:
testDf.registerTempTable("testTable")
predictImplicitDf.registerTempTable("predictImplicitTable")
join = """
SELECT 
    custId, stockCode, preference, user, product, rating
FROM 
    testTable, predictImplicitTable 
WHERE
    custId = user and stockCode = product"""
joinDf = sqlContext.sql(join)

print joinDf.filter("preference >=3").orderBy("custId", "preference").show(10)

+------+---------+----------+-----+-------+--------------------+
|custId|stockCode|preference| user|product|              rating|
+------+---------+----------+-----+-------+--------------------+
| 12347|    21791|       3.0|12347|  21791| 0.11184186142936409|
| 12347|    84997|       3.0|12347|  84997| 0.12314848691489971|
| 12347|    84991|       3.0|12347|  84991|  0.1027434199056403|
| 12347|    22376|       3.0|12347|  22376|0.023383039643338432|
| 12347|    47559|       3.0|12347|  47559| 0.04998915932911552|
| 12347|    22492|       3.0|12347|  22492| 0.12110006311334459|
| 12347|    20719|       4.0|12347|  20719| 0.04370908494907677|
| 12347|    84558|       5.0|12347|  84558|0.011325413537812721|
| 12352|    22779|       3.0|12352|  22779| 0.01556474292436821|
| 12359|    82613|       3.0|12359|  82613|0.011095459215197279|
+------+---------+----------+-----+-------+--------------------+
only showing top 10 rows

None


In [16]:
#Calculate the Mean Squared Error for modelImplicit1.
MSE = joinDf.map(lambda l: (l.preference - l.rating)**2).mean()
print MSE


4.17997474654


### In order to calculate the MSE for the second model, we need to rerun the few cells above by switching one model name for the other.

In [48]:
#Calculate the Mean Squared Error for modelImplicit2.
MSE = joinDf.map(lambda l: (l.preference - l.rating)**2).mean()
print MSE

4.08793275415


### Based on the results above, it seems that modelImplict2 has a lower MSE and therefore has better accuracy. Consequently, we use model2 in the remainder of this section.

### <span style="color:green">Section 2.5: Manual verification of ALS recommendations for one chosen customer...</span>

In [18]:
# The example below keeps the same customer as the one picked in Section 1 above.
modelImplicit2.recommendProducts(16248,3)

[Rating(user=16248, product=22111, rating=0.0942212742424042),
 Rating(user=16248, product=23355, rating=0.08892665546439966),
 Rating(user=16248, product=22865, rating=0.07648471083984797)]

In [30]:
#Reminder of what this customer bought...
sqlContext.sql("""SELECT distinct stockCode, description from retailPurchases 
               where custId in (16248)""").take(25)

[Row(stockCode=22809, description=u'SET OF 6 T-LIGHTS SANTA'),
 Row(stockCode=23157, description=u'SET OF 6 NATIVITY MAGNETS '),
 Row(stockCode=22865, description=u'HAND WARMER OWL DESIGN'),
 Row(stockCode=23419, description=u'HOME SWEET HOME BOTTLE '),
 Row(stockCode=22866, description=u'HAND WARMER SCOTTY DOG DESIGN'),
 Row(stockCode=23155, description=u'KNICKERBOCKERGLORY MAGNET ASSORTED '),
 Row(stockCode=23156, description=u'SET OF 5 MINI GROCERY MAGNETS')]

In [31]:
#Here is what we are recommending for them
sqlContext.sql("""SELECT distinct stockCode, description from retailPurchases 
               where stockCode in (22111, 23355, 22865)""").take(35)

[Row(stockCode=23355, description=u'HOT WATER BOTTLE KEEP CALM'),
 Row(stockCode=22865, description=u'HAND WARMER OWL DESIGN'),
 Row(stockCode=22111, description=u'SCOTTIE DOG HOT WATER BOTTLE')]

## The recommendations look like reasonable suggestions... 

# In the following section, we will turn our attention to the more recent Spark ML library which is built on top of data frames.

# <span style="color:blue">Section 3 (Please ensure you've run Section 0 above)</span>
### Use SPARK ML library and (assume) EXPLICIT user feedback on product ratings.
### SPARK ML uses data frames.

### <span style="color:green">Section 3.1: Training the explicit Spark ML models</span>

### The syntax of Spark ML is slightly different than that of Spark MLlib. The model is trained here by "fitting" the algorithm to the provided input data frame, using the now familiar hyper parameters: rank and maxIter. Also note that the columns of interest (to ALS logic) in the input dataframe are selected by name using the 'userCol', 'itemCol' and 'ratingCol' keywords.

In [10]:
# We train two models using different combinations for the rank and the number of iterations.
from pyspark.ml.recommendation import ALS

als1 = ALS(rank=15, maxIter=15,userCol="custId",itemCol="stockCode",ratingCol="preference")
model1 = als1.fit(trainDf)

als2 = ALS(rank=2, maxIter=10,userCol="custId",itemCol="stockCode",ratingCol="preference")
model2 = als2.fit(trainDf)

print "The models have been trained"

The models have been trained


### <span style="color:green">Section 3.2: Preparing the testing set</span>

### As seen in previous sections, it is a best practice to clean the customers and stock test data sets from any elements which would not have been present in the training set. We covered a way to do this using SQL in previous sections. Below is a different way of getting the same result using Python/Spark transformations.

In [11]:
# A Python set is an unordered collection of unique elements. We are building below two sets of
# customers and stock ids. The lambda function extracts the relevant column from the trainDf 
# dataframe, and we build a set which is then collected into a final array.
customers = set(trainDf.rdd.map(lambda line: line.custId).collect())
stock = set(trainDf.rdd.map(lambda line: line.stockCode).collect())

print testDf.count()
testDf = testDf.rdd.filter(lambda line: line.stockCode in stock and\
                                            line.custId in customers).toDF()

print testDf.count()

51996
51953


### The two numbers printed above correspond to the values obtained higher at the end of Section 1.7, which confirms the fact that all entries where a customer Id or stock Id was not present in the training set does not get a prediction rating value...

### <span style="color:green">Section 3.3: Getting predictions using the ML model with EXPLICIT feedback</span>

### Below is an excerpt from the Spark documentation regarding the "transform" method used in the cell below:
A Transformer is an abstraction that includes feature transformers and learned models. Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. For example:

- A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
- A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.

<span style="color:red">Note:</span> The original text from which this paragraph was copied can be found in the Spark documentation at the following link: http://spark.apache.org/docs/latest/ml-guide.html#transformers

In [13]:
# This transform method used below is therefore the method described in the second bullet above.
# It will take the test set as input, and apply the model predictions to it, appending a rating
# column and then return a new dataframe.
predictions1 = model1.transform(testDf)
predictions2 = model2.transform(testDf)

# A a quick verification, we will print the first couple of rows from both returned dataframes.
# We notice that the 'prediction' column was appended as expected.
print predictions1.take(2)
print predictions2.take(2)

[Row(custId=17637, stockCode=20831, preference=1.0, prediction=0.5975714325904846), Row(custId=14286, stockCode=20831, preference=1.0, prediction=0.8286399245262146)]
[Row(custId=17637, stockCode=20831, preference=1.0, prediction=0.5547059774398804), Row(custId=14286, stockCode=20831, preference=1.0, prediction=0.8869171142578125)]


### <span style="color:red">Remark:</span> In this case, it can be noted that the resulting predictions dataframes already have both the original customer rating, under the column 'preference' and the model's generated rating, under the column 'prediction'. It will therefore not be needed to construct a table or dataframe which contains both, as we already have it...

### <span style="color:green">Section 3.4: Comparing user entries with predicted ratings and getting a Mean Squared Error</span>

### The Mean Squared Error is evaluated in the same way as in previous sections.

In [12]:
meanSquaredError1 = predictions1.map(lambda line: (line.preference - line.prediction)**2).mean()
meanSquaredError2 = predictions2.map(lambda line: (line.preference - line.prediction)**2).mean()
    
print 'Mean squared error = %.4f for our first model' % meanSquaredError1
print 'Mean squared error = %.4f for our second model' % meanSquaredError2

Mean squared error = 1.5996 for our first model
Mean squared error = 1.9945 for our second model


### The first model seems to present better accuracy than the second, which is expected given the chosen values for rank and maxIter...

### <span style="color:green">Section 3.5: Manual verification of ALS recommendations for one chosen customer...</span>

### We will now work on verifying "manually" the efficiency of our model. In order to do so, we will pick one customer who bought a reasonably small, yet meaningful number of items (i.e more than one or two, but not dozens) and check the top three recommendations for that particular customer in order to decide whether these make sense or not. To begin, we write a SQL query to find customers ids who bought a reasonable number of items, i.e between 5 and 10. We start with a count of 7 items (feel free to update the query below with a different count for other attemps)
#### <span style="color:red">Note:</span> We will continue using the same customer id as in previous section so as to compare outputs from different engines using the same baseline.

In [20]:
query = """select custId, count(*) as count from retailPurchases 
           group by custId having count = 7 order by count"""

sqlContext.sql(query).take(10)

[Row(custId=17639, count=7),
 Row(custId=14641, count=7),
 Row(custId=18042, count=7),
 Row(custId=17647, count=7),
 Row(custId=18048, count=7),
 Row(custId=16248, count=7),
 Row(custId=15257, count=7),
 Row(custId=15458, count=7),
 Row(custId=15463, count=7),
 Row(custId=18064, count=7)]

### Select customer id 16248...and check which items this customer bought...

In [27]:
sqlContext.sql("""SELECT distinct stockCode, description from retailPurchases 
               where custId in (16248)""").toPandas()

Unnamed: 0,stockCode,description
0,22809,SET OF 6 T-LIGHTS SANTA
1,23157,SET OF 6 NATIVITY MAGNETS
2,22865,HAND WARMER OWL DESIGN
3,23419,HOME SWEET HOME BOTTLE
4,22866,HAND WARMER SCOTTY DOG DESIGN
5,23155,KNICKERBOCKERGLORY MAGNET ASSORTED
6,23156,SET OF 5 MINI GROCERY MAGNETS


### We will now obtain some recommendations for the chosen customer through our selected model.
### Spark ML does not offer a 'recommendProducts' method as was used above with MLlib. But that is no problem, we will obtain our recommendations using the same 'transform' method which was used a few cells above with the test dataset. More specifically, we will:
- Build a dataframe named userItems, where our selected userid is associated with every single product id in the database
- Pass this userItems dataframe through the 'transform' method, which as seen previously, will append a 'rating' column, indicating the strength of the recommendation for our chosen user id and the current product
- Sort the resulting 'recommendations' dataframe and select the top N rows

In [30]:
# Build a dataframe named userItems, by selecting distinct values of all available stock codes
# and appending our chosen customer id.
from pyspark.sql.functions import lit

userItems = trainDf.select("stockCode").distinct().\
            withColumn('custId', lit(16248))

# Print a few rows from our dataframe to verify that it was built as expected.
print userItems.show(5)

+---------+------+
|stockCode|custId|
+---------+------+
|    21231| 16248|
|    85231| 16248|
|    22431| 16248|
|    23231| 16248|
|    22631| 16248|
+---------+------+
only showing top 5 rows

None


### Use now 'transform' to rate each item. We will also sort the returned prediction codes in descending order of prediction strength, so as to see the highest recommendations first.

In [23]:
userItems = model1.transform(userItems)

#print userItems.take(5)
print userItems.sort("prediction",ascending=False).show(5)

+---------+------+----------+
|stockCode|custId|prediction|
+---------+------+----------+
|        2| 16248| 3.4413526|
|    90214| 16248| 3.0965843|
|    84596| 16248| 2.5026555|
|    84997| 16248|  2.111412|
|    37489| 16248| 2.1043334|
+---------+------+----------+
only showing top 5 rows

None


### We will use a now familiar SQL query to obtain the text description of the recommended items.

In [29]:
query = """
SELECT 
    distinct stockCode, description 
FROM 
    retailPurchases 
WHERE 
    stockCode in (90214, 84596, 84997, 37489)
    ORDER BY stockCode limit 20
"""
items = sqlContext.sql(query)
print items.toPandas()

    stockCode                         description
0       37489   YELLOW/PINK FLOWER DESIGN BIG MUG
1       37489    GREEN/BLUE FLOWER DESIGN BIG MUG
2       37489   BLUE/YELLOW FLOWER DESIGN BIG MUG
3       37489    PINK/GREEN FLOWER DESIGN BIG MUG
4       84596          SMALL CHOCOLATES PINK BOWL
5       84596        SMALL MARSHMALLOWS PINK BOWL
6       84596      BISCUITS SMALL BOWL LIGHT BLUE
7       84596  SMALL DOLLY MIX DESIGN ORANGE BOWL
8       84596        SMALL LICORICE DES PINK BOWL
9       84596         MIXED NUTS LIGHT GREEN BOWL
10      84997   BLUE 3 PIECE POLKADOT CUTLERY SET
11      84997   PINK 3 PIECE POLKADOT CUTLERY SET
12      84997  GREEN 3 PIECE POLKADOT CUTLERY SET
13      84997    CHILDRENS CUTLERY RETROSPOT RED 
14      84997   RED 3 PIECE RETROSPOT CUTLERY SET
15      84997     CHILDRENS CUTLERY POLKADOT PINK
16      84997     CHILDRENS CUTLERY POLKADOT BLUE
17      84997   CHILDRENS CUTLERY POLKADOT GREEN 
18      90214       "LETTER ""G"" BLING KEY RING"


# <span style="color:blue">Section 4 (Please ensure you've run Section 0 above)</span>
### Use SPARK ML library and (assume) IMPLICIT user feedback on product ratings.
### SPARK ML uses data frames.

### <span style="color:green">Section 4.1: Training the implicit Spark ML models</span>

In [31]:
from pyspark.ml.recommendation import ALS

als1 = ALS(rank=15, maxIter=15, implicitPrefs=True,userCol="custId",itemCol="stockCode",ratingCol="preference")
modelImplicit1ml = als1.fit(trainDf)

als2 = ALS(rank=2, maxIter=10,implicitPrefs=True, userCol="custId",itemCol="stockCode",ratingCol="preference")
modelImplicit2ml = als2.fit(trainDf)

print "The models have been trained"

The models have been trained


### <span style="color:green">Section 4.2: Preparing the testing set</span>

### As seen in previous sections, it is a best practice to clean the customers and stock test data sets from any elements which would not have been present in the training set. We covered a way to do this using SQL in previous sections. Below is a different way of getting the same result using Python/Spark transformations.

In [34]:
customers = set(trainDf.rdd.map(lambda line: line.custId).collect())
stock = set(trainDf.rdd.map(lambda line: line.stockCode).collect())

print testDf.count()
testDf = testDf.rdd.filter(lambda line: line.stockCode in stock and\
                                            line.custId in customers).toDF()

print testDf.count()

51996
51953


### <span style="color:green">Section 4.3: Getting predictions using the ML model with IMPLICIT feedback</span>

### Below is an excerpt from the Spark documentation regarding the "transform" method used in the cell below:
A Transformer is an abstraction that includes feature transformers and learned models. Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. For example:

- A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
- A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.

<span style="color:red">Note:</span> The original text from which this paragraph was copied can be found in the Spark documentation at the following link: http://spark.apache.org/docs/latest/ml-guide.html#transformers

In [35]:
predictions1 = modelImplicit1ml.transform(testDf)
predictions2 = modelImplicit2ml.transform(testDf)

print predictions1.take(2)
print predictions2.take(2)

[Row(custId=17637, stockCode=20831, preference=1.0, prediction=0.02002069726586342), Row(custId=14286, stockCode=20831, preference=1.0, prediction=0.06646830588579178)]
[Row(custId=17637, stockCode=20831, preference=1.0, prediction=0.008741205558180809), Row(custId=14286, stockCode=20831, preference=1.0, prediction=0.03968136012554169)]


### <span style="color:green">Section 4.4: Comparing user entries with predicted ratings and getting a Mean Squared Error</span>

In [36]:
meanSquaredError1 = predictions1.map(lambda line: (line.preference - line.prediction)**2).mean()
meanSquaredError2 = predictions2.map(lambda line: (line.preference - line.prediction)**2).mean()
    
print 'Mean squared error = %.4f for our first model' % meanSquaredError1
print 'Mean squared error = %.4f for our second model' % meanSquaredError2

Mean squared error = 3.7671 for our first model
Mean squared error = 3.9502 for our second model


### The first model seems to present better accuracy than the second, which is expected given the chosen values for rank and maxIter...

### <span style="color:green">Section 4.5: Manual verification of ALS recommendations for one chosen customer...</span>

### We will now work on verifying "manually" the efficiency of our model. In order to do so, we will pick one customer who bought a reasonably small, yet meaningful number of items (i.e more than one or two, but not dozens) and check the top three recommendations for that particular customer in order to decide whether these make sense or not. To begin, we write a SQL query to find customers ids who bought a reasonable number of items, i.e between 5 and 10. We start with a count of 7 items (feel free to update the query below with a different count for other attemps)
#### <span style="color:red">Note:</span> We will continue using the same customer id as in previous section so as to compare outputs from different engines using the same baseline.

In [37]:
query = """select custId, count(*) as count from retailPurchases 
           group by custId having count = 7 order by count"""

sqlContext.sql(query).take(10)

[Row(custId=17639, count=7),
 Row(custId=14641, count=7),
 Row(custId=18042, count=7),
 Row(custId=17647, count=7),
 Row(custId=18048, count=7),
 Row(custId=16248, count=7),
 Row(custId=15257, count=7),
 Row(custId=15458, count=7),
 Row(custId=15463, count=7),
 Row(custId=18064, count=7)]

### Select customer id 16248...and check which items this customer bought...

In [38]:
sqlContext.sql("""SELECT distinct stockCode, description from retailPurchases 
               where custId in (16248)""").toPandas()

Unnamed: 0,stockCode,description
0,22809,SET OF 6 T-LIGHTS SANTA
1,23157,SET OF 6 NATIVITY MAGNETS
2,22865,HAND WARMER OWL DESIGN
3,23419,HOME SWEET HOME BOTTLE
4,22866,HAND WARMER SCOTTY DOG DESIGN
5,23155,KNICKERBOCKERGLORY MAGNET ASSORTED
6,23156,SET OF 5 MINI GROCERY MAGNETS


### We will now obtain some recommendations for the chosen customer through our selected model.
### Spark ML does not offer a 'recommendProducts' method as was used above with MLlib. But that is no problem, we will obtain our recommendations using the same 'transform' method which was used a few cells above with the test dataset. More specifically, we will:
- Build a dataframe named userItems, where our selected userid is associated with every single product id in the database
- Pass this userItems dataframe through the 'transform' method, which as seen previously, will append a 'rating' column, indicating the strength of the recommendation for our chosen user id and the current product
- Sort the resulting 'recommendations' dataframe and select the top N rows

In [39]:
# Get recommendations for one particular user
from pyspark.sql.functions import lit

userItems = trainDf.select("stockCode").distinct().\
            withColumn('custId', lit(16248))

print userItems.show(5)

+---------+------+
|stockCode|custId|
+---------+------+
|    21231| 16248|
|    85231| 16248|
|    22431| 16248|
|    23231| 16248|
|    22631| 16248|
+---------+------+
only showing top 5 rows

None


### Use now 'transform' to rate each item. We will also sort the returned prediction codes in descending order of prediction strength, so as to see the highest recommendations first.

In [40]:
userItems = modelImplicit1ml.transform(userItems)

#print userItems.take(5)
userItems.sort("prediction",ascending=False).show(5)

+---------+------+-----------+
|stockCode|custId| prediction|
+---------+------+-----------+
|    22111| 16248|0.114170134|
|    23355| 16248| 0.10992701|
|    22866| 16248| 0.10629847|
|    22865| 16248| 0.10552523|
|    84029| 16248|0.103145994|
+---------+------+-----------+
only showing top 5 rows



### We will use a now familiar SQL query to obtain the text description of the recommended items.

In [41]:
query = """
SELECT 
    distinct stockCode, description 
FROM 
    retailPurchases 
WHERE 
    stockCode in (22111, 23355, 22866, 22865, 84029)
    ORDER BY stockCode
"""
items = sqlContext.sql(query)
print items.toPandas()

   stockCode                          description
0      22111         SCOTTIE DOG HOT WATER BOTTLE
1      22865               HAND WARMER OWL DESIGN
2      22866        HAND WARMER SCOTTY DOG DESIGN
3      23355           HOT WATER BOTTLE KEEP CALM
4      84029       RED WOOLLY HOTTIE WHITE HEART.
5      84029  KNITTED UNION FLAG HOT WATER BOTTLE


### <span style="color:green">Section 4.6: Taking a look at the ALS internals...</span>

### The ALS model exposes the matrices resulting from the factorization as userFactors and ItemFactors.

### Let's take a look at the first few rows from the usersFactor matrix obtained from the second model.

In [70]:
modelImplicit2ml.userFactors.take(5)

[Row(id=12350, features=[-0.013113889843225479, 0.05081048607826233]),
 Row(id=12360, features=[-0.2741982042789459, 0.4235096573829651]),
 Row(id=12370, features=[0.11715137958526611, 0.23450566828250885]),
 Row(id=12380, features=[-0.03724483400583267, 0.29389825463294983]),
 Row(id=12390, features=[-0.15921206772327423, 0.16143059730529785])]

### Checking out the representation of the matrix, it is a DataFrame with an integer representing the Item and an array of double values representing the internal encoding of the item according to the chosen rank

In [158]:
modelImplicit2ml.itemFactors

DataFrame[id: int, features: array<float>]

### The second model was generated with a rank value of 2, so the features array has two entries as expected.

In [159]:
modelImplicit2ml.itemFactors.take(5)

[Row(id=10080, features=[0.007301546633243561, 0.04111911728978157]),
 Row(id=10120, features=[-0.033222559839487076, 0.13802224397659302]),
 Row(id=15030, features=[0.00714927027001977, 0.020543742924928665]),
 Row(id=15060, features=[0.0944393128156662, 0.2238655388355255]),
 Row(id=16010, features=[-0.0018164529465138912, 0.02436750754714012])]

### A few cells higher, we ran predictions for customer id using modelImplicit1ml<span style="color:red">16248</span> and obtained a rating of <span style="color:red">0.114170134</span> for product <span style="color:red">22111</span>. Let's reconstruct this result by manipulating directly the feature matrices.

### We will first extract the Vector representing user id 16248 from the userFactors matrix and print it. The vector has 15 components, since modelImplicit1 was built with a rank of 15.

In [153]:
from pyspark.mllib.linalg import Vectors, DenseMatrix
from pyspark.mllib.linalg import DenseVector
from numpy import array
v1=Vectors.dense(modelImplicit1ml.userFactors.rdd.lookup(16248))
print v1[0]

[-0.02880237 -0.03255107 -0.00068029  0.03202556 -0.04235569  0.00507769
 -0.01419941  0.02616715  0.01925078 -0.04078222 -0.06489316  0.0342073
  0.03402569  0.00997772  0.00766637]


### Second, we will extract the Vector representing product 22111 from the itemFactors matrix. This vector has 15 components as well.

In [155]:
v2=DenseVector(modelImplicit1ml.itemFactors.rdd.lookup(22111))
print v2[0]

[-0.01636196 -0.60772979  0.15177801  0.33173507 -0.27724168  0.39341718
 -0.02558193  0.10382893  0.00441691 -0.40955025 -0.53257501  0.16183987
  0.24687007 -0.13434839  0.34330273]


### Compute the dot product of both vectors.

In [156]:
v1[0].dot(v2[0])

0.11417013119362149

##### Data Citation
Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197â€“208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17).