In [3]:
# importing some libraries
import findspark
findspark.init()
import os

import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SQLContext

conf = (pyspark.SparkConf().setAppName('test').set("spark.executor.memory", "2g").setMaster("local[2]"))
sc = pyspark.SparkContext(conf=conf)


sqlContext = SQLContext(sc)

ValueError: Couldn't find Spark, make sure SPARK_HOME env is set or Spark is in an expected location (e.g. from homebrew installation).

#### MovieLens data: [https://grouplens.org/datasets/movielens/](https://grouplens.org/datasets/movielens/)

In [33]:
#Get the data here http://grouplens.org/datasets/movielens/
movielens = sc.textFile("ml-100k/u.data")

movielens.first() #u'196\t242\t3\t881250949'
movielens.count() #100000


100000

In [34]:
movielens

ml-100k/u.data MapPartitionsRDD[297] at textFile at NativeMethodAccessorImpl.java:0

In [4]:
#Clean up the data by splitting it
#Movielens readme says the data is split by tabs and
#is user product rating timestamp
clean_data = movielens.map(lambda x:x.split('\t'))

In [5]:
#As an example, extract just the ratings to its own RDD
#rate.first() is 3
rate = clean_data.map(lambda y: int(y[2]))
rate.mean() #Avg rating is 3.52986


3.5298600000000024

In [6]:
#Extract just the users
users = clean_data.map(lambda y: int(y[0]))
users.distinct().count() #943 users


943

In [7]:
#You don't have to extract data to its own RDD
#This command counts the distinct movies
#There are 1,682 movies
clean_data.map(lambda y: int(y[1])).distinct().count() 


1682

In [11]:
#Need to import three functions / objects from the MLlib
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating

#We'll need to map the movielens data to a Ratings object 
#A Ratings object is made up of (user, item, rating)
mls = movielens.map(lambda l: l.split('\t'))
ratings = mls.map(lambda x: Rating(int(x[0]),int(x[1]), float(x[2])))

#Need a training and test set
train, test = ratings.randomSplit([0.7,0.3],7856)

In [12]:
train.count()

69939

In [13]:
test.count()

30061

In [14]:
#Need to cache the data to speed up training
train.cache()
test.cache()


PythonRDD[22] at RDD at PythonRDD.scala:53

In [15]:
#Setting up the parameters for ALS
rank = 5 # Latent Factors to be made
numIterations = 10 # Times to repeat process
#Create the model on the training data
model = ALS.train(train, rank, numIterations)


In [17]:
#Examine the latent features for one product
model.productFeatures().first()

(2,
 array('d', [-0.47366243600845337, 0.2726004719734192, 1.2456307411193848, 0.8580869436264038, -0.8408119082450867]))

In [18]:
#Examine the latent features for one user
model.userFeatures().first()

(2,
 array('d', [-1.2704147100448608, -0.6064961552619934, 0.8071704506874084, 1.206157922744751, -0.766410768032074]))

In [19]:
# For Product X, Find N Users to Sell To
model.recommendUsers(242,100)

[Rating(user=219, product=242, rating=7.280133160104819),
 Rating(user=50, product=242, rating=7.265627019721686),
 Rating(user=362, product=242, rating=6.35277359356703),
 Rating(user=731, product=242, rating=6.148359019935889),
 Rating(user=928, product=242, rating=6.009240899167956),
 Rating(user=895, product=242, rating=5.9621013910294165),
 Rating(user=240, product=242, rating=5.6891964934033545),
 Rating(user=174, product=242, rating=5.678547172772404),
 Rating(user=316, product=242, rating=5.644106911289798),
 Rating(user=691, product=242, rating=5.633779339496025),
 Rating(user=310, product=242, rating=5.601809726899805),
 Rating(user=4, product=242, rating=5.568101938395715),
 Rating(user=519, product=242, rating=5.5676255738371765),
 Rating(user=147, product=242, rating=5.543100756546043),
 Rating(user=153, product=242, rating=5.540357870400628),
 Rating(user=414, product=242, rating=5.465356673650458),
 Rating(user=142, product=242, rating=5.4345974383432285),
 Rating(user=7

In [20]:
# For User Y Find N Products to Promote
model.recommendProducts(196,10)

[Rating(user=196, product=793, rating=5.907701973624219),
 Rating(user=196, product=1495, rating=5.611960089810969),
 Rating(user=196, product=1164, rating=5.589479243117108),
 Rating(user=196, product=593, rating=5.518624752032652),
 Rating(user=196, product=1463, rating=5.516050313620706),
 Rating(user=196, product=867, rating=5.399117007753461),
 Rating(user=196, product=1251, rating=5.293851877548062),
 Rating(user=196, product=634, rating=5.232539427653386),
 Rating(user=196, product=814, rating=5.166743212217266),
 Rating(user=196, product=1605, rating=5.1490608700447815)]

In [21]:
#Predict Single Product for Single User
model.predict(196, 242)

3.579402990709257

In [22]:
# Predict Multi Users and Multi Products
# Pre-Processing
pred_input = train.map(lambda x:(x[0],x[1]))   

In [23]:
# Lots of Predictions
#Returns Ratings(user, item, prediction)
pred = model.predictAll(pred_input) 

#Get Performance Estimate
#Organize the data to make (user, product) the key)
true_reorg = train.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))

#Do the actual join
true_pred = true_reorg.join(pred_reorg)

In [24]:

#Need to be able to square root the Mean-Squared Error
from math import sqrt

MSE = true_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = sqrt(MSE)#Results in 0.7629908117414474

#Test Set Evaluation
#More dense, but nothing we haven't done before
test_input = test.map(lambda x:(x[0],x[1])) 
pred_test = model.predictAll(test_input)
test_reorg = test.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred_test.map(lambda x:((x[0],x[1]), x[2]))
test_pred = test_reorg.join(pred_reorg)
test_MSE = test_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
test_RMSE = sqrt(test_MSE)

print(test_MSE, test_RMSE)

1.0423980853253052 1.020978983782382


In [35]:
#If you're happy, save your model!

#model.save(sc,"ml-model")

## Resources

- (http://www.learnbymarketing.com/644/recsys-pyspark-als/)[http://www.learnbymarketing.com/644/recsys-pyspark-als/]

Source of example code with more resources

- (http://jmcauley.ucsd.edu/data/amazon/)[http://jmcauley.ucsd.edu/data/amazon/]

cache of amazon data you can use to build recommendation systems