In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().setAppName("recommender")
sc = SparkContext(conf=conf)

In [None]:
def load_data(path, header, token_fun):
    return sc.textFile(path).filter(lambda x: x!=header).map(lambda x: x.split(",")).map(token_fun).cache()

links = load_data('ml-latest-small/links.csv', 
                  'movieId,imdbId,tmdbId', 
                  lambda tokens: (int(tokens[0]), int(tokens[1])))

movies = load_data('ml-latest-small/movies.csv', 
                   'movieId,title,genres', 
                   lambda tokens: (int(tokens[0]),tokens[1]))

ratings = load_data('ml-latest-small/ratings.csv', 
                    'userId,movieId,rating,timestamp', 
                    lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2])))

tags = load_data('ml-latest-small/tags.csv', 
                    'userId,movieId,tag,timestamp', 
                    lambda tokens: (int(tokens[0]), int(tokens[1]), tokens[2]))

In [None]:
movies.take(5)

In [None]:
def split_sets(ratings, proportions):
    split = ratings.randomSplit(proportions)
    return {'training': split[0], 'validation': split[1], 'test': split[2]}
    
sets = split_sets(ratings, [0.63212056, 0.1839397, 0.1839397])

In [None]:
from pyspark.mllib.recommendation import ALS
rank = 10
iterations = 10
seed = 42
model = ALS.train(sets['training'], rank, seed=seed, iterations=iterations)

In [None]:
validation = sets['validation'].map(lambda x: (x[0], x[1]))
validation.take(5)

In [None]:
predictions = model.predictAll(validation)
predictions.take(10)