In [1]:
import re
from pyspark.mllib.recommendation import ALS, Rating

In [3]:
rdd = sc.textFile("/Users/bencook/Desktop/pyspark-workshop-data/")

In [4]:
def user_highlight_pair(log):
    user = re.compile("AuthUser=\d+")
    reel = re.compile("HighlightReel=\d+")
    user_loc = user.search(log)
    reel_loc = reel.search(log)
    
    return (log[user_loc.start():user_loc.end()], log[reel_loc.start():reel_loc.end()])

In [5]:
views = rdd.map(user_highlight_pair)

## If all users/highlights fit in memory

In [6]:
all_users = views.map(lambda x: x[0]).distinct().collect()
all_highlights = views.map(lambda x: x[1]).distinct().collect()

In [7]:
user_dict = {}
for i, u in enumerate(all_users):
    user_dict[u] = i
    
highlight_dict = {}
for i, h in enumerate(all_highlights):
    highlight_dict[h] = i

In [8]:
user_dict_b = sc.broadcast(user_dict)
highlight_dict_b = sc.broadcast(highlight_dict)

In [9]:
indexed_views = views.map(lambda x: (user_dict_b.value[x[0]], highlight_dict_b.value[x[1]]))

## If users/highlights don't fit in memory

In [10]:
all_users = views.map(lambda x: x[0]).distinct().zipWithIndex()
all_highlights = views.map(lambda x: x[1]).distinct().zipWithIndex()

In [11]:
indexed_views = views.leftOuterJoin(all_users) \
    .map(lambda x: x[1]) \
    .leftOuterJoin(all_highlights) \
    .map(lambda x: x[1])

### Sanity check

In [12]:
views.map(lambda x: x[0]).distinct().count()

39283

In [13]:
indexed_views.map(lambda x: x[0]).max()

39282

In [14]:
views.map(lambda x: x[1]).distinct().count()

62152

In [15]:
indexed_views.map(lambda x: x[1]).max()

62151

## Collaborative Filtering

In [16]:
rank = 10
num_iterations = 20

In [17]:
ratings = indexed_views.groupBy(lambda x: x) \
    .map(lambda x: Rating(x[0][0], x[0][1], len(x[1].data)))

In [18]:
# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.train(ratings, rank, num_iterations)