In [1]:
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
import pandas as pd
import numpy as np
from itertools import product
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
spark = SparkSession.builder.master('local[4]').getOrCreate()

In [3]:
ratings_df = pd.read_csv('data/wa_ratings_data.csv')
ratings_df = ratings_df.rename(columns={'Unnamed: 0':'Username'})
ratings_df = ratings_df.set_index('Username')
ratings_df.drop('Unnamed: 1', axis=1, inplace=True)
ind = []
for index in ratings_df.index:
    if ratings_df.loc[index, :].isnull().all() == True:
        ind.append(index)
ratings_df.drop(ind, inplace=True)
als_data = ratings_df.stack().reset_index().rename(columns={'Username':'user','level_1':'board_game', 0:'rating'})
board_games = dict(enumerate(ratings_df.columns))
board_game_index = dict((y,x) for x,y in board_games.iteritems())
for game in board_game_index.keys():
    als_data['board_game'].replace(to_replace=game, value=board_game_index[game], inplace=True)
users = dict(enumerate(ratings_df.index))
user_index = dict((y,x) for x,y in users.iteritems())
for user in user_index.keys():
    als_data['user'].replace(to_replace=user, value=user_index[user], inplace=True)
users = dict(enumerate(ratings_df.index))
user_index = dict((y,x) for x,y in users.iteritems())
for user in user_index.keys():
    als_data['user'].replace(to_replace=user, value=user_index[user], inplace=True)

In [6]:
als_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 128417 entries, 0 to 128416
Data columns (total 3 columns):
user          128417 non-null int64
board_game    128417 non-null int64
rating        128417 non-null float64
dtypes: float64(1), int64(2)
memory usage: 2.9 MB


In [7]:
als_model = ALS(
    itemCol='user',
    userCol='board_game',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=100,
    maxIter=10
    )

In [8]:
als_spark_df = spark.createDataFrame(als_data)

In [9]:
als_fit_model = als_model.fit(als_spark_df)

In [10]:
preds_train_data = als_fit_model.transform(als_spark_df)

In [12]:
preds_train_df = preds_train_data.toPandas()

In [20]:
user_0_df = pd.DataFrame(list(product([0], als_data['board_game'].unique())))

In [23]:
user_0_df = user_0_df.rename(columns={0:'user', 1:'board_game'})

In [24]:
user_0 = spark.createDataFrame(user_0_df)

In [25]:
preds_user_0 = als_fit_model.transform(user_0)

In [26]:
preds_user_0_df = preds_user_0.toPandas()

In [32]:
sorted_preds_user_0 = preds_user_0_df.sort_values('prediction')

In [36]:
top_games = []
rated_games_user_0 = als_data[als_data['user'] == 0]['board_game'].values
games = list(sorted_preds_user_0['board_game'].values)
while len(top_games) < 5:
    game = games.pop()
    if game in rated_games_user_0:
        continue
    else:
        top_games.append(board_games[game])

In [37]:
top_games

['Age of Steam (Warfrog Second English Edition)',
 'Combat Commander: Europe (English first edition, second printing)',
 'Hansa Teutonica (English-only second edition)',
 'Brass (Second Edition Second Printing)',
 'Le Havre (Lookout Games Australian edition)']

In [96]:
sorted_df = predictions_df.sort_values(['user', 'board_game'])

In [97]:
sorted_df

Unnamed: 0,user,board_game,rating,prediction
122734,0,279,6.00,6.458409
122705,0,291,5.00,5.532967
122823,0,541,6.00,6.206935
122712,0,792,8.90,7.688457
122803,0,911,6.00,6.380839
122778,0,1346,6.00,6.601476
122728,0,1435,9.00,8.235113
122738,0,1475,7.00,6.340346
122772,0,1673,5.00,5.620183
122688,0,1777,6.00,5.441391
