In [16]:
import math
import json
import pandas as pd
from pyspark.mllib.recommendation import ALS
import sqlalchemy
from sqlalchemy_utils import database_exists, create_database
import pymysql

In [2]:
def parse_raw_string(raw_string):
    user_inventory = json.loads(raw_string)
    return user_inventory.items()[0]

user_inventory_rdd = sc.textFile('../input/user_inventory.txt').map(parse_raw_string).zipWithIndex()

In [3]:
# Label encoder for user_id
def id_index(x):
    (user_id, lst_inventory), index = x
    return (index, user_id)

dic_id_index = user_inventory_rdd.map(id_index).collectAsMap()

In [4]:
def create_rating(x):
    (user_id, lst_inventory), index = x
    if lst_inventory != None:
        return (index, [(i.get('appid'), i.get('playtime_forever')) for i in lst_inventory if i.get('playtime_forever') > 0])
    else:
        return (index, [])

# ratings does not include new users, but ratings.count() get error without returning (index, [])
ratings = user_inventory_rdd.map(create_rating).flatMapValues(lambda x: x).map(lambda (index, (appid,playtime)): (index, appid, playtime))

In [5]:
print ratings.count(), ratings.first()

293942 (0, 4000, 370)


In [25]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
with open('../input/user_inventory.txt', 'rb') as f:
    user_details = f.read().splitlines()
    
    lst_playtime = []
    lst_newuser = []
    for user_detail in user_details:
        user_detail = json.loads(user_detail)
        user_id = user_detail.keys()[0]
        lst_game = user_detail.values()[0]
        if lst_game != None:
            for game in lst_game:
                if game.get('playtime_forever') > 0:
                    lst_playtime.append((user_id, game.get('appid'), game.get('playtime_forever')))
        #else:
        #    lst_playtime.append((user_id, np.nan, np.nan))
        
df = pd.DataFrame(lst_playtime, columns = ['user_id','app_id','playtime'])
le = LabelEncoder()
le.fit(df['user_id'])
df['user_id'] = le.transform(df['user_id'])
user_encoder = pd.DataFrame(le.classes_, columns = ['label'])
user_encoder = user_encoder.reset_index()
#df.to_csv('../input/user_playtime.csv', header = False, index = False)
#user_encoder.to_csv('../input/user_id_encoder.csv', index = False)

In [None]:
# Normalize play_time (ratings) to 0~10


In [28]:
# Split data as traing, validation, and test
training_RDD, validation_RDD, test_RDD = ratings.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [36]:
# Tunning ALS parameters
seed = 5L
iterations = 10
regularization_parameter = 0.1 # 0.01
ranks = [22, 25, 28, 30]
blocks = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank = rank, blocks=blocks, seed=seed, lambda_=regularization_parameter, iterations=iterations)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), r[2])).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    print 'For rank %s the RMSE is %s' % (rank, error)
    
#For rank 3 the RMSE is 94294.6311486
#For rank 5 the RMSE is 110531.737013
#For rank 10 the RMSE is 67947.9357512

# 0.1
#For rank 3 the RMSE is 76298.8287313
#For rank 5 the RMSE is 94251.2743932
#For rank 10 the RMSE is 56212.5401911
#For rank 12 the RMSE is 51998.6940992
#For rank 15 the RMSE is 40676.5975445
#For rank 18 the RMSE is 33840.9232464
#For rank 20 the RMSE is 31912.7587304

For rank 22 the RMSE is 29588.6723634
For rank 25 the RMSE is 26658.8688455
For rank 28 the RMSE is 21954.4785868
For rank 30 the RMSE is 22039.0644171


In [37]:
# Predictions negative value (?), scale
rates_and_preds.take(3)

[((342, 17460), (31, 2941.30149713941)),
 ((3066, 298260), (7, 264.36673026580075)),
 ((4678, 225540), (644, 1091.9505839981243))]

In [38]:
# Test the selected model
best_rank = 28
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 23037.286254


In [6]:
# Build the recommendation model using Alternating Least Squares (Final)
seed = 5L
iterations = 10
regularization_parameter = 0.1
rank = 28
blocks = -1

model = ALS.train(ratings, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
#predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))

In [9]:
# Recommendation
dic_recommend = {'g0':{},'g1':{},'g2':{},'g3':{},'g4':{},'g5':{},'g6':{},'g7':{},'g8':{},'g9':{}}
for index in dic_id_index.keys():
    try:
        user_id = dic_id_index[index]
        lst_recommend = [i.product for i in model.recommendProducts(index, 10)]
        for k in range(10):
            dic_recommend['g'+str(k)].update({user_id:lst_recommend[k]})
    except:
        pass

In [15]:
df = pd.DataFrame(dic_recommend)
df.index.name = 'user_id'
df = df.reset_index()
df.to_csv('../output/game_recommended.csv', index = False)
df.head()

Unnamed: 0,user_id,g0,g1,g2,g3,g4,g5,g6,g7,g8,g9
0,76561197960323774,39210,252490,268850,9900,216150,238960,20590,269470,203770,389430
1,76561197960355015,304030,107410,268850,429200,322170,359550,500,236850,269470,47410
2,76561197960385706,39210,236850,304030,238960,372000,4000,212160,8930,20590,346110
3,76561197960422789,730,236850,295270,216150,251810,394360,207890,230410,269470,228180
4,76561197960464402,730,359550,231670,202990,295270,240,207890,269470,313740,429200


In [18]:
sql_user = 'root'
sql_pwd = 'fu365565'

engine = sqlalchemy.create_engine('mysql+pymysql://'+sql_user+':'+sql_pwd+'@127.0.0.1/game_recommendation?charset=utf8mb4')
if not database_exists(engine.url):
    create_database(engine.url)
print(database_exists(engine.url))
df.to_sql(name='tbl_recommended_games', con=engine, if_exists = 'replace', index=False, flavor='mysql')

True
