In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

## Init Spark Session

In [2]:
spark = SparkSession.builder\
    .appName('movie-recs')\
    .master('local[8]')\
    .getOrCreate()

## Load Data

In [3]:
%%time
df = spark.read \
    .option("header", "true") \
    .format("csv")\
    .load('../data/movielens/small_rating.csv')

CPU times: user 1.5 ms, sys: 1.23 ms, total: 2.73 ms
Wall time: 3.35 s


## Convert Data to Ratings Objects

In [4]:
ratings = df.rdd\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

## Create Train / Test Partitions

In [5]:
train, test = ratings.randomSplit([0.8, 0.2])

## Train Model

In [6]:
%%time
K = 10
epochs = 10
model = ALS.train(train, K, epochs)

CPU times: user 31.3 ms, sys: 9.6 ms, total: 40.9 ms
Wall time: 14.1 s


## View and Evaluate Training Results

In [15]:
%%time

# get training set predictions
x = train.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(x)\
    .map(lambda r: ((r[0], r[1]), r[2]))

CPU times: user 16.8 ms, sys: 3.81 ms, total: 20.6 ms
Wall time: 7.36 s


In [16]:
%%time

# join ratings and predictions
ratingsAndPredictions = train\
    .map(lambda r: ((r[0], r[1]), r[2]))\
    .join(predictions)

CPU times: user 23.8 ms, sys: 4.7 ms, total: 28.5 ms
Wall time: 47.3 ms


In [47]:
print('Training Set Results')
print('(userId, movieId), (rating, predicted rating)')
print(ratingsAndPredictions.take(5))

Training Set Results
(userId, movieId), (rating, predicted rating)
[((3863, 587), (4.0, 4.093021125404596)), ((3863, 1179), (2.0, 3.211821122362567)), ((3863, 1963), (4.0, 3.2659414993180413)), ((4473, 165), (5.0, 3.771323971371701)), ((4473, 1221), (5.0, 5.183301033514477))]


In [18]:
%%time

# compute MSE
mse = ratingsAndPredictions\
    .map(lambda r: (r[1][0] - r[1][1])**2)\
    .mean()

print(f'Train MSE: {mse}')

Train MSE: 0.5135299416479703
CPU times: user 14.9 ms, sys: 3.77 ms, total: 18.7 ms
Wall time: 7.63 s


## View and Evaluate Test Results

In [21]:
%%time

# get test set predictions
x = test.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(x)\
    .map(lambda r: ((r[0], r[1]), r[2]))

CPU times: user 18 ms, sys: 3.91 ms, total: 21.9 ms
Wall time: 6.69 s


In [24]:
%%time

# join ratings and predictions
ratingsAndPredictions = test\
    .map(lambda r: ((r[0], r[1]), r[2]))\
    .join(predictions)

CPU times: user 55.6 ms, sys: 7.35 ms, total: 62.9 ms
Wall time: 82.2 ms


In [25]:
print('Test Set Results')
print('(userId, movieId), (rating, predicted rating)')
ratingsAndPredictions.take(5)

Test Set Results
(userId, movieId), (rating, predicted rating)


[((3863, 587), (4.0, 4.093021125404596)),
 ((3863, 1179), (2.0, 3.211821122362567)),
 ((3863, 1963), (4.0, 3.2659414993180413)),
 ((4473, 165), (5.0, 3.771323971371701)),
 ((4473, 1221), (5.0, 5.183301033514477))]

In [26]:
%%time

# compute MSE
mse = ratingsAndPredictions\
    .map(lambda r: (r[1][0] - r[1][1])**2)\
    .mean()

print(f'Test MSE: {mse}')

Test MSE: 0.5435640108472513
CPU times: user 17.6 ms, sys: 4.04 ms, total: 21.6 ms
Wall time: 1.83 s


## Viewing Matrix Factors

We can use these as features to compute user and product similarity scores.

In [31]:
print('Viewing user features')
model.userFeatures().take(2)

Viewing user features


[(0,
  array('d', [-2.4913628101348877, -1.0777480602264404, -0.06278830766677856, 2.0980241298675537, -0.38235652446746826, -2.6224629878997803, -0.6706669926643372, -2.9868948459625244, -0.9448479413986206, 0.812681257724762])),
 (8,
  array('d', [-2.2169198989868164, -0.927251935005188, -0.18451672792434692, 1.1308958530426025, -0.17926691472530365, -2.0334668159484863, -0.8369787931442261, -2.349142074584961, -0.6834691762924194, 0.7411922812461853]))]

In [33]:
print('Viewing product features')
model.productFeatures().take(2)

Viewing product features


[(16,
  array('d', [-0.2612162232398987, 0.12512777745723724, -0.34570401906967163, 0.5759508013725281, 0.1746082305908203, -0.3501777648925781, -0.42493462562561035, -0.13120533525943756, -0.06286771595478058, 0.5467106103897095])),
 (24,
  array('d', [-0.941043496131897, 0.10615763813257217, 0.023559004068374634, -0.1536172777414322, 0.366079181432724, -0.403524786233902, -0.21307744085788727, -0.14883272349834442, -0.10264371335506439, -0.49082833528518677]))]

## Providing Recommendations

We can recommend products for a user, or we can recommend users for a product.

In [36]:
%%time

print('Get predictions for a user')
model.recommendProductsForUsers(5).take(2)

Get predictions for a user
CPU times: user 16.4 ms, sys: 5.27 ms, total: 21.7 ms
Wall time: 892 ms


[(1216,
  (Rating(user=1216, product=858, rating=4.901110195490002),
   Rating(user=1216, product=923, rating=4.8403610191065685),
   Rating(user=1216, product=1221, rating=4.8033667997240705),
   Rating(user=1216, product=912, rating=4.785329111734935),
   Rating(user=1216, product=3435, rating=4.688967017411553))),
 (1792,
  (Rating(user=1792, product=527, rating=4.848774796557304),
   Rating(user=1792, product=2028, rating=4.704430416639492),
   Rating(user=1792, product=110, rating=4.687931326691416),
   Rating(user=1792, product=318, rating=4.673595787945411),
   Rating(user=1792, product=590, rating=4.639922203396457)))]

In [38]:
%%time

print('Find users for each product')
model.recommendUsersForProducts(5).take(2)

Find users for each product
CPU times: user 18 ms, sys: 5.4 ms, total: 23.4 ms
Wall time: 668 ms


[(1792,
  (Rating(user=1961, product=1792, rating=4.872198813556538),
   Rating(user=2075, product=1792, rating=4.816083509739068),
   Rating(user=4175, product=1792, rating=4.705379786300854),
   Rating(user=4813, product=1792, rating=4.638693434038025),
   Rating(user=2725, product=1792, rating=4.517412254094975))),
 (2688,
  (Rating(user=4175, product=2688, rating=4.64147204543725),
   Rating(user=1961, product=2688, rating=4.618057090882778),
   Rating(user=2075, product=2688, rating=4.527792554720106),
   Rating(user=2725, product=2688, rating=4.5079718659408545),
   Rating(user=3573, product=2688, rating=4.472822290011315)))]

## Save Model For Later

In [46]:
model.save(spark.sparkContext, path='als-mf-model1')