In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import *
import numpy as np
from itertools import permutations

In [3]:
sc = SparkContext.getOrCreate()

In [4]:
small_ratings_raw_data = sc.textFile('/Users/gregcattell/PyProjects/data/PJ_data/ml-latest-small/ratings.csv')
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
.map(lambda line : line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2]))\
.map(lambda x: (int(x[0]), int(x[1]), float(x[2])))

In [5]:
small_movies_raw_data = sc.textFile('/Users/gregcattell/PyProjects/data/PJ_data/ml-latest-small/movies.csv')
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1]))\
.map(lambda x: (int(x[0]), x[1]))

#### ______Checking data set: 
#### small_ratings_data = (userID, movieID, rating)
#### small_movies_data = (movieID, movieName)

In [6]:
small_ratings_data.take(3)

[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0)]

In [7]:
small_movies_data.take(3)

[(1, 'Toy Story (1995)'),
 (2, 'Jumanji (1995)'),
 (3, 'Grumpier Old Men (1995)')]

## Split data into training set and test set: 8:2

In [8]:
training_RDD, test_RDD = small_ratings_data.randomSplit([8, 2], seed=0)
test_user_unwatch = test_RDD.map(lambda x: (x[0], x[1]))

#### ___Checking data set:
#### Numbers of training data and test data
#### test samples for prediction: (userID, unWatchedID)

In [9]:
Total_train = training_RDD.count()
Total_test = test_RDD.count()
print("The total number of training dataset is", Total_train)
print("The total number of test dataset is", Total_test)
print("Rate of training and test:", Total_train/Total_test)

The total number of training dataset is 80720
The total number of test dataset is 20116
Rate of training and test: 4.012726188108968


In [10]:
test_user_unwatch.take(3)

[(1, 70), (1, 101), (1, 110)]

## Normalize the rating of movie: subtract mean rating $m_{i}$ from each movie i

In [11]:
movie_means = training_RDD.map(lambda x: (x[1], (x[2], 1))).reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))\
.map(lambda x: (x[0], x[1][0]/x[1][1]))

In [12]:
normalized_ratings = training_RDD.map(lambda x: (x[1], (x[0], x[2]))).join(movie_means)\
.map(lambda x: (x[1][0][0], x[0], x[1][0][1] - x[1][1]))

#### ___Checking dataset:
#### movie_means: (movie, mean of ratings)
#### normalized_ratings: (userID, movieID, norm_rating)

In [13]:
movie_means.take(3)

[(6, 3.98125), (50, 4.2398843930635834), (260, 4.21875)]

In [14]:
normalized_ratings.take(3)

[(1, 260, 0.78125), (4, 260, 0.78125), (7, 260, 0.78125)]

## Getting cosine similarity with Pearson correlation coefficient
### $S_{xy} = $ items rated by both user x and user y
### $m_{1}$ and $m_{2}$ are normalized movie ratings

<font size = "5">

$$
sim =  \frac{\sum_{s \in S_{xy} } m_{1} \times m_{2}}{ \sqrt{\sum_{s \in S_{xy} } m_{1}^{2}} \sqrt{\sum_{s \in S_{xy} } m_{2}^{2}} }
$$

</font>

In [15]:
def item_perm(line):
    perm = list(permutations(line, 2))
    return perm

In [16]:
#(user, list((movie, ratings)))
#((m1, r1), (m2, r2))
#((m1, m2),(r1*r2, r1^2, r2^2))
#((m1, m2), (sum(r1*r2), sum(r1^2), sum(r2^2)))
#((m1,m2), sim)

cosine_sim = normalized_ratings.map(lambda x: (x[0], (x[1], x[2]))).groupByKey()\
.flatMap(lambda x: item_perm(list(x[1])))\
.map(lambda x: ((x[0][0], x[1][0]),(x[0][1]*x[1][1], x[0][1]**2, x[1][1]**2)))\
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))\
.map(lambda x: (x[0], x[1][0]/ np.sqrt(x[1][1]) / np.sqrt(x[1][2])) if (x[1][1] * x[1][2])!= 0 else \
    (x[0],0)).cache() 

#### ____Checking dataset:
#### cosine_sim: (movie_1, movie_2, similarity)

In [17]:
cosine_sim.take(3)

[((1240, 61350), 0.999314833766767),
 ((2028, 1358), -0.1775294625417523),
 ((5632, 33794), -0.9515851669023498)]

## Getting baseline estimation

In [18]:
mu = training_RDD.map(lambda x: x[2]).mean()

In [19]:
bx = training_RDD.map(lambda x: (x[0], (x[2], 1))).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
.map(lambda x: (x[0], x[1][0]/x[1][1] - mu))

In [20]:
bi = movie_means.map(lambda x: (x[0], x[1] - mu))

In [21]:
bx_dict = {x[0]: x[1] for x in bx.collect()}
bi_dict = {x[0]: x[1] for x in bi.collect()}

def baseLine(user, movie):
    if movie not in bi_dict:
        return bx_dict[user] + mu
    return bx_dict[user] + bi_dict[movie] + mu

#### ___checking dataset:
#### mu, bx(user, bx), bi(movie, bi)
#### baseLine(user, movie)

In [22]:
mu

3.502682111000975

In [23]:
bi.take(3)

[(6, 0.478567888999025), (50, 0.7372022820626083), (260, 0.7160678889990248)]

In [24]:
bx.take(3)

[(2, 0.41398455566569137),
 (4, 0.02141427454119338),
 (6, -0.024579921219953338)]

In [25]:
baseLine(2,6)

4.3952345556656915

## Calculating the estimate rating rxi

In [26]:
test_data = test_RDD.map(lambda x: (x[1], x[0])) #(unwatched, uId)

In [27]:
movie_sim = cosine_sim.map(lambda x: (x[0][0],(x[0][1], x[1]))) #(unwatched, (watched, sim))

In [28]:
training_rating = training_RDD.map(lambda x: ((x[0], x[1]), x[2])) #((user, watched), rating)

### We choose the top four nearest neighbors for each (user, unwatched) pair

In [70]:
uu_wrs = movie_sim.join(test_data)\
.map(lambda x: ((x[1][1], x[1][0][0]), (x[0], x[1][0][1])))\
.join(training_rating).map(lambda x: ((x[0][0], x[1][0][0]),(x[0][1], x[1][1], x[1][0][1])))

In [71]:
def rxi(uu, wrs, k):
    user, unwat = uu
    wrs.sort(key = lambda x: x[2], reverse = True)
    numerator =0 
    denominator =0
    for wrs_unit in wrs[:k]:
        wat, rat, sim = wrs_unit
        numerator += sim * (rat - baseLine(user, wat))
        denominator += sim
    if denominator ==0:
        return (uu, baseLine(user,unwat))
    return (uu, baseLine(user, unwat)+numerator/denominator)

In [72]:
k=7
prediction = uu_wrs.groupByKey().map(lambda x: rxi(x[0], list(x[1]), k))

#### ___checking dataset:
#### uu_wrs: ((user, watched), (unwatched, rating, sim))
#### rxi function:
#### prediction: ((user, unwatched), rxi)

In [73]:
uu_wrs.take(3)

[((414, 91542), (420, 2.0, 0.7437188876603105)),
 ((414, 6942), (420, 2.0, 0.38952960301417106)),
 ((414, 1320), (420, 2.0, 0.3004950270825381))]

In [76]:
uu = (1,3)
wrs = [(1, 2.0, 0.45), (5, 4.0, 0.6), (12, 3.5, 0.2), (14, 2.5, 0.36)]
k = 4
rij(uu, wrs, k)

((1, 3), 2.896390178144837)

In [77]:
prediction.take(3)

[((600, 750), 4.059201090074584),
 ((600, 38886), 3.1030505952380953),
 ((600, 8910), 2.869765866873065)]

## Compare with the true value, calculating MSE & RMSE

In [78]:
test_value = test_RDD.map(lambda x: ((x[0], x[1]), x[2]))
comparison = prediction.join(test_value).cache()

In [79]:
comparison.take(3)

[((600, 1270), (4.0629975124378115, 4.5)),
 ((600, 2302), (3.265292553191489, 3.0)),
 ((182, 1644), (2.0463721804511277, 3.5))]

In [80]:
MAE = comparison.map(lambda x: abs(x[1][0] - x[1][1])).mean()
RMSE = np.sqrt(comparison.map(lambda x: (x[1][0]-x[1][1])**2).mean())

In [81]:
print("The MAE for the CF prediction is: ", MAE)
print("The RMSE for the CF prediction is: ", RMSE)

The MAE for the CF prediction is:  0.7491815388690404
The RMSE for the CF prediction is:  0.9776822718371895


### However, there are some missing data, we can check the volume of comparision and test_RDD

In [82]:
missing_data = test_RDD.count() - comparison.count()
print(missing_data)

784


### This is because some movies in test set are not in training set
### So, some movie information are not in similarity table

In [83]:
num_movie_train = training_RDD.map(lambda x: (x[1])).distinct().count()
num_movie = small_movies_data.count()
missing_movie = num_movie - num_movie_train

### Add the missing data

In [84]:
pred_uu = prediction.map(lambda x: x[0])
mis_part = test_RDD.map(lambda x: (x[0], x[1])).subtract(pred_uu)\
.map(lambda x: ((x[0],x[1]), baseLine(x[0], x[1])))
pred_Total = prediction.union(mis_part)
comp_Total = pred_Total.join(test_value).cache()

In [85]:
MAE_T = comp_Total.map(lambda x: abs(x[1][0] - x[1][1])).mean()
RMSE_T = np.sqrt(comp_Total.map(lambda x: (x[1][0]-x[1][1])**2).mean())

In [86]:
print("The MAE for the CF prediction is: ", MAE_T)
print("The RMSE for the CF prediction is: ", RMSE_T)

The MAE for the CF prediction is:  0.7506121072803289
The RMSE for the CF prediction is:  0.9796308461005092


## Using ALS for recommendation:

In [87]:
from pyspark.mllib.recommendation import ALS

In [88]:
pre_train_RDD, pre_valid_RDD, pre_test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = pre_valid_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = pre_test_RDD.map(lambda x: (x[0], x[1]))

In [91]:
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(pre_train_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = pre_valid_RDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    MAE = rates_and_preds.map(lambda r: abs(r[1][0] - r[1][1])).mean()
    RMSE = np.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = RMSE
    err += 1
    print('For rank', rank,'the MAE is: ', MAE,'the RMSE is: ', RMSE) 
    if RMSE < min_error:
        min_error = RMSE
        best_rank = rank

print('The best model was trained with rank: ', best_rank)

For rank 4 the MAE is:  0.6984923736604529 the RMSE is:  0.9114026007220244
For rank 8 the MAE is:  0.7045145440395727 the RMSE is:  0.9180911451666388
For rank 12 the MAE is:  0.7087779592332236 the RMSE is:  0.9183376003842972
The best model was trained with rank:  4


In [93]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_user_unwatch).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MAE = rates_and_preds.map(lambda r: abs(r[1][0] - r[1][1])).mean()
RMSE = np.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print('For testing data the MAR is', MAE)
print('For testing data the RMSE is', RMSE)

For testing data the MAR is 0.6751796857448238
For testing data the RMSE is 0.8802398422205924


## Making Top-10 Recommendation

In [95]:
#At first, we need to build a list of (user, movie(not watched before))
#we first get (user, list(watched movie))
user_watched = training_RDD.map(lambda x: (x[0],x[1])).groupByKey()\
.map(lambda x: (x[0], set(x[1])))
#get a movie set for function:
movie_set = set(small_movies_data.map(lambda x: x[0]).collect())

In [96]:
# list((int user, int unwatched)) getUnwatched(int user, set movies):
def getUnwatched(user, movies):
    unwat_mov = list(movie_set - movies)
    unwat_user = [(user, x) for x in unwat_mov]
    return unwat_user

In [97]:
user_unWatched = user_watched.flatMap(lambda x: getUnwatched(x[0], x[1]))

In [99]:
total_recommendations = model.predictAll(user_unWatched).map(lambda r: ((r[0], r[1]), r[2]))

In [100]:
total_recommendations.take(10)

[((312, 81132), 3.594017154327881),
 ((96, 81132), 3.244278589910458),
 ((600, 81132), 3.1654283745497827),
 ((324, 81132), 3.3527077471031346),
 ((180, 81132), 2.8364072517403205),
 ((156, 81132), 3.240139905189947),
 ((216, 81132), 3.2890586614329282),
 ((408, 81132), 3.599622858828204),
 ((456, 81132), 4.013056526198035),
 ((480, 81132), 2.9908068948515423)]

In [101]:
def getTop_Ten(pairs):
    pairs.sort(key =lambda x: x[1], reverse = True)
    return set([x[0] for x in pairs[:10]])

In [102]:
Top_Ten = total_recommendations.map(lambda line: (line[0][0], (line[0][1], line[1]))).groupByKey()\
.map(lambda line: (line[0], list(line[1]))).map(lambda line: (line[0], getTop_Ten(line[1])))

In [103]:
Top_Ten.take(10)

[(312, {3379, 3567, 4642, 6818, 7815, 8477, 25771, 40491, 58301, 99764}),
 (96, {720, 3404, 3566, 3925, 5607, 7842, 58303, 59018, 60943, 94070}),
 (600, {3379, 3567, 5222, 6818, 7815, 40491, 58301, 96004, 99764, 141718}),
 (324, {1194, 3567, 3846, 4634, 7025, 25947, 26258, 26326, 82378, 141718}),
 (180, {3925, 4495, 4617, 6201, 7841, 8235, 51931, 59018, 60943, 112804}),
 (156, {3379, 4495, 6201, 7564, 7815, 7841, 8235, 26326, 58301, 89904}),
 (216, {3096, 3379, 4495, 4642, 6201, 7564, 7815, 7841, 8235, 89904}),
 (408, {3567, 3925, 5867, 7815, 7842, 33649, 59018, 60943, 67618, 130518}),
 (456, {40, 3379, 3567, 3837, 5222, 5480, 33649, 86347, 98279, 130518}),
 (480, {2239, 4495, 6201, 7815, 7841, 8235, 51931, 59018, 60943, 89904})]

In [104]:
def sortTest(pairs):
    pairs.sort(key = lambda x: x[1], reverse = True)
    return [x[0] for x in pairs]

In [106]:
testlist = test_RDD.map(lambda r: (r[0], (r[1], r[2]))).groupByKey()\
.map(lambda r: (r[0], list(r[1]))).map(lambda line: (line[0], sortTest(line[1])))

In [107]:
testlist.take(1)

[(2, [60756, 68157, 3578, 86345, 71535])]

In [108]:
comparison_com = testlist.map(lambda line: (line[0], set(line[1]))).join(Top_Ten)\
.map(lambda x: (len(x[1][0] & x[1][1]), len(x[1][1]),len(x[1][0])))\
.map(lambda x: (x[0]/x[1], x[0]/x[2]))

In [109]:
precision = comparison_com.map(lambda x: x[0]).mean()
recall = comparison_com.map(lambda x: x[1]).mean()
print("precision: ", precision)
print("recall: ", recall)

precision:  0.0013157894736842107
recall:  0.00021420595911864591


In [110]:
F_measure = 2 * precision *recall/ (precision + recall)
print("F_measure: ", F_measure)

F_measure:  0.0003684324020398062


In [111]:
def nDCG(test, prediction):
    DCG =0
    IDCG = 0
    j=1
    for i in range(len(test)):
        if test[i] in prediction:
            DCG+=1/np.log2(1+i+1)
            IDCG+=1/np.log2(j+1)
            j+=1
    if IDCG!=0:
        return DCG/IDCG
    else: return 0

In [117]:
comparison_ndcg = testlist.join(Top_Ten).map(lambda x: (x[0] ,nDCG(x[1][0], x[1][1])))

In [120]:
comparison_ndcg.filter(lambda x: x[1]!=0).take(8)

[(448, 0.15773243839286438),
 (606, 0.16730017881017412),
 (610, 0.13480990596580797),
 (541, 0.43067655807339306),
 (305, 0.1781035935540111),
 (474, 0.125),
 (41, 0.1810425967800402),
 (391, 0.17542506358195453)]