In [None]:
from pyspark.sql.functions import *

# Define some constants
RATING_MIN = 0.5
RATING_MAX = 5.0
RATING_RANGE = RATING_MAX - RATING_MIN

# Enable crossjoins
print("Configuring Spark...")
spark.conf.set("spark.sql.crossJoin.enabled", True)

# Define some helper functions
def readCSV(fname, removeHeader=False, separator=','):
    print("Loading file ", fname, "...")
    rdd = sc.textFile(fname)
    if removeHeader:
        firstline = rdd.first()
        rdd = rdd.filter(lambda x: x != firstline)
    return rdd.map(lambda x: x.split(separator))

# Load the movies and ratings database
ratings = readCSV("./ratings_train.csv", removeHeader=True)
movies = readCSV("./movies.csv", removeHeader=True)

# Parse the rating data
# [user_id, movie_id, rating, timestamp]
print("Parsing ratings...")
ratings = ratings.map(lambda x: x[0].split('::'))

# Parse the movie genres
# [id, name, genres[]]
print("Parsing movies...")
movies = movies.map(lambda x: [x[0], x[1], x[2].split('|')])

# Create a dataframe for the ratings
print("Create ratings dataframe...")
ratings_df = spark.createDataFrame(ratings, ['user_id', 'movie_id', 'rating', 'timestamp'])

# Cache the ratings
print("Caching ratings dataframe...")
ratings_df = ratings_df.cache()

# TODO: Remove this query
#client_ratings_df = ratings_df.filter(ratings_df.user_id == client_id).alias("client")
# TODO: Remove this query
#user_ratings_df = ratings_df.filter(ratings_df.user_id != client_id).alias("other")

# Create a dataframe for the movies, and filter it
print("Create movies dataframe...")
movies_df = spark.createDataFrame(\
                                  movies,\
                                  ['id', 'name', 'genres'])\
    .select(\
            "id",\
            "name")\
    .alias("movies")

# Cache list of movies
print("Caching movies dataframe...")
movies_df = movies_df.cache()

# Join the movies watched by the client and the other user
print("Creating ratings relation map...")
client_df = ratings_df.alias("client");
other_df = ratings_df.alias("other");
ratings_map = client_df\
    .join(other_df,\
          on = [col("client.movie_id") == col("other.movie_id"),\
                col("client.user_id") != col("other.user_id")],\
          how = "inner")

# Determine the rating distance for each user/movie pair, and normalize it
print("Filtering ratings map, and calculating ratings distance...")
ratings_map = ratings_map.select(\
                 col("client.user_id").alias("sim_client_user_id"),\
                 col("other.user_id").alias("sim_other_user_id"),\
                 "client.movie_id",\
                 (abs(col("client.rating") - col("other.rating")) - RATING_RANGE / 2).alias("rating_dist_norm")\
                )

# Cache the ratings map
#print("Caching ratings map...")
#ratings_map = ratings_map.cache()

## Create a list of users
print("Creating list of users...")
user_list = ratings_df.select(col("user_id").alias("list_user_id")).distinct()
#user_list_df = user_list_df.where(col("list_user_id") == users_similarity.first().sim_client_user_id)

# Cache the list of users
print("Caching list of users...")
user_list = user_list.cache()

# Calculate the user relation similarity
print("Calculating user relation similarity...")
users_similarity = ratings_map\
    .groupBy(\
             "sim_client_user_id",\
             "sim_other_user_id")\
    .agg(\
         sum("rating_dist_norm")\
             .alias("similarity"))\
    .alias("similarity")
    
# Cache the user similarities
#print("Caching user similarities...")
#user_similarity = user_similarity.cache()

# Join all movies to all users 
print("Relating all users to all movies to map suggestions...")
suggestion_map = user_list.join(\
                                movies_df.select(\
                                                 col("id").alias("sug_movie_id")\
                                                ),\
                                on = col("sug_movie_id") != True,\
                                how = "inner")

# Map all ratings to movies
print("Mapping all ratings to movies...")
suggestion_map = suggestion_map.join(ratings_df,\
                               on = [col("sug_movie_id") == col("movie_id"),\
                                     col("list_user_id") != col("user_id")],\
                               how = "inner")

# Add user similarity values
print("Add user similarity values to ratings...")
suggestion_map = suggestion_map.join(\
                               users_similarity,\
                               on = [col("list_user_id") == col("sim_client_user_id"),\
                                    col("user_id") == col("sim_other_user_id")],\
                              how = "inner")

# Multiply the rating values by their similarity
print("Expanding ratings based on user similarity...")
suggestion_map = suggestion_map\
    .select(\
            "*",\
            (col("rating") * col("similarity"))\
                .alias("rating_mul"))

# Normalize the expanded rating values
print("Normalizing expanded ratings...")
suggestion_map = suggestion_map\
    .groupBy("list_user_id", "sug_movie_id")\
    .agg((ceil((sum("rating_mul") / sum("similarity")) * 2) / 2).alias("rating_norm"))
    
# Cache the normalized ratings
suggestion_map = suggestion_map.cache()

# Create and output a list of ratings
print("Creating list of ratings to print...")
suggestion_map.sort(desc("rating_norm")).show(10000)

print("DONE")

Configuring Spark...
Loading file  ./ratings_train.csv ...
Loading file  ./movies.csv ...
Parsing ratings...
Parsing movies...
Caching ratings...
Create ratings dataframe...
Create movies dataframe...
Creating ratings relation map...
Filtering ratings map, and calculating ratings distance...
Calculating user relation similarity...
Creating list of users...
Caching list of users...
Relating all users to all movies to map suggestions...
Mapping all ratings to movies...
Add user similarity values to ratings...
Expanding ratings based on user similarity...
Normalizing expanded ratings...
Creating list of ratings to print...
