In [1]:
from operator import itemgetter
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkContext
from pyspark.ml.linalg import Vectors
import shutil
import glob
import sys

In [2]:
def load_data(sc, filepath, header=True):
    """Load data from a specified filepath. 

    If header is set to False,
    the header will be removed from the first row of the RDD before being 
    returned.
    """ 
    data = sc.textFile(filepath).map(lambda x: x.split(","))
    if not header:
        hdr = data.first()
        data = data.filter(lambda row: row != hdr)  # Remove header.

    return data

In [3]:
def factorize(rdd):
    """Factorize RDD column and return a mapping of {string: integer index}."""
    return rdd.distinct().sortBy(lambda x: x).zipWithIndex().collectAsMap()

In [4]:
def bin_fn(value):
    """Bin predictions into one of five buckets."""
    if 0 <= value < 1:
        return ">=0 and <1"
    elif 1 <= value < 2:
        return ">=1 and <2"
    elif 2 <= value < 3:
        return ">=2 and <3"
    elif 3 <= value < 4:
        return ">=3 and <4"
    else:
        return ">=4"

In [5]:
def compute_mean_rating(rdd):
    return (
        rdd.aggregateByKey(tup, lambda a, b: (a[0] + b,    a[1] + 1),
                             lambda a, b: (a[0] + b[0], a[1] + b[1]))
        .mapValues(lambda v: v[0] / v[1])
        .collectAsMap())

In [6]:
conf = SparkContext.getOrCreate()
sc.setCheckpointDir("checkpoint_dir/")

In [7]:
trainFile = "train_review.csv"
testFile = "test_review.csv"
train = (load_data(sc, trainFile, header=False)
            .map(lambda x: (x[0], x[1], int(x[2]))))
test = (load_data(sc, testFile, header=False)
            .map(lambda x: (x[0], x[1], int(x[2]))))

In [8]:
# Union train and test to get a combined feature mapping for all user and business IDs.
combined = train.union(test)
users = factorize(combined.map(itemgetter(0)))
users_inv = {users[k]: k for k in users} 
businesses = factorize(combined.map(itemgetter(1)))
businesses_inv = {businesses[k]: k for k in businesses}

In [9]:
# Compute average user ratings as lookup/dictionary.
tup = (0,0)
avg_ratings = compute_mean_rating(train.map(lambda x: (x[0], float(x[-1]))))
avg_ratings_item = compute_mean_rating(train.map(lambda x: (x[1], float(x[-1]))))

In [10]:
# Normalize ratings.
train = train.map(lambda x: (x[0], x[1], x[2] - avg_ratings[x[0]]))

# Get mapping of normalized ratings for each (user_id, business_id) pair.
norm_ratings = train.map(lambda x: ((x[0], x[1]), x[2])).collectAsMap()

In [11]:
# Compute sparse vectors for each user_id.
sparse_vecs = (
    train
        # Normalize ratings be mean user ratings.
        .map(lambda x: (users[x[0]], [(businesses[x[1]], x[-1] - avg_ratings[x[0]])]))
        # "Pivot" data.
        .reduceByKey(lambda x, y: x + y)
        # Assemble sparse vectors for pearson correlation.
        .map(lambda x: (x[0], Vectors.sparse(len(businesses), x[1])))
        .collectAsMap())

In [12]:
# Generate index of items mapping to users who rated that item. 
inv_index = (combined.map(lambda x: (x[1], [x[0]]))
                .reduceByKey(lambda x, y: x + y)
                .collectAsMap())

In [13]:
# Make predictions.
weights = {}
predictions = []
for idx, (u, i, r) in enumerate(test.collect(), 1):
    if idx % 1000 == 0:
        print(idx)
    sum_ = sum_w = 0
    for u_ in inv_index[i]:
        if (u, u_) in weights:
            w = weights[(u, u_)]
        else:
            try:
                x = sparse_vecs[users[u]]
                y = sparse_vecs[users[u_]]

                w = x.dot(y) / (x.norm(2) * y.norm(2))
            except KeyError:
                w = 0
            weights[(u, u_)] = weights[(u_, u)] = w
        sum_ += norm_ratings.get((u_, i), avg_ratings_item.get(i, 0) - avg_ratings.get(u, 0)) * w
        sum_w += abs(w)

    p = float(sum_) / (sum_w if sum_w else 1) + avg_ratings.get(u, 0)
    p = p if p else avg_ratings_item.get(i, 2.5)
    predictions.append(((u, i), p, r))

1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
20000
21000
22000
23000
24000
25000
26000
27000
28000
29000
30000
31000
32000
33000
34000
35000
36000
37000
38000
39000
40000
41000
42000
43000
44000
45000


In [14]:
predictions = sc.parallelize(predictions)
print(predictions.take(5))
fname = 'Output'
try:
    shutil.rmtree(fname)
except Exception:
    pass

(predictions
    .map(lambda x: (x[0][0], x[0][1], x[1]))
    .sortBy(lambda x: (x[0], x[1]))
    .map(lambda x: ','.join(str(y) for y in x))
    .coalesce(1)
    .saveAsTextFile(fname))

[(('YHWsLBS8jzZiPjKHMFOaAA', 'iKMLsX1Je7P3wAOEc9scDg'), 3.3499205756695196, 4), (('YHWsLBS8jzZiPjKHMFOaAA', 'amsvLzfEvCzLwP0MnXAJ1w'), 3.0827430052176625, 4), (('YHWsLBS8jzZiPjKHMFOaAA', '43PeF0ERpSIiEbXM6f9N2g'), 2.6579117451040983, 3), (('YHWsLBS8jzZiPjKHMFOaAA', 'y-sRypoTK2L6EuozhEMQzA'), 2.8174581229555473, 2), (('YHWsLBS8jzZiPjKHMFOaAA', 'VeVjZ8aR_zFEM9jKJuvraw'), 3.3414413782462304, 4)]


In [15]:
# Cleanup output directory.
shutil.move(next(glob.iglob('{}/part-00000'.format(fname))), fname + '.txt')
shutil.rmtree(fname)

In [16]:
# Join predictions with actual values and compute differences.
diffs = predictions.map(lambda x: abs(x[1] - x[2]))

# Partition diff values into 5 bins. 
bins = diffs.map(bin_fn).countByValue()

# Compute Mean Squared Error.
rmse = diffs.map(lambda x: x ** 2).mean() ** .5
print("Bins", dict(bins))
print("MSE", rmse)

shutil.rmtree("checkpoint_dir/")

Bins {'>=0 and <1': 31283, '>=1 and <2': 11163, '>=3 and <4': 381, '>=2 and <3': 2366, '>=4': 43}
MSE 1.0426011246517954
