In [1]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

spark = (
    SparkSession.builder.config("spark.sql.debug.maxToStringFields", 100)
    .appName("reviews")
    .getOrCreate()
)

# data_filepath = "../data/cleaned_steam_reviews/game_id={578080,271590,359550,105600,4000,252490,252950,218620,945360,292030}"
# data_filepath = "../data/cleaned_steam_reviews/game_id={294100,304390,812140,306130,391220,221380,262060,1289310,646570,552520}"
# data_filepath = "../data/cleaned_steam_reviews/game_id={294100,304390,812140}"
data_filepath = "../data/cleaned_steam_reviews"
steam_games_filepath = "../data/cleaned_steam_games"

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/28 19:38:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/28 19:38:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
steam_reviews = spark.read.parquet(data_filepath)
steam_games = spark.read.parquet(steam_games_filepath)

                                                                                

In [3]:
playtime = steam_reviews.select('app_id', 'author_steamid', 'author_playtime_forever') \
                            .where((col("author_steamid") != F.lit("null")) & (F.length(col("author_steamid")) == 17) & (col("author_steamid").rlike("^[0-9]+$"))) \
                            .where(steam_reviews["author_playtime_forever"].isNotNull() \
                                   & steam_reviews["app_id"].isNotNull())


In [4]:
playtime = playtime.groupBy('app_id', 'author_steamid') \
                    .agg(F.avg('author_playtime_forever').alias('author_playtime_forever'))

In [5]:
d = {}
# Fill in the entries one by one

d["author_playtime_forever"] = playtime.approxQuantile("author_playtime_forever",[0.01,0.99],0.25)

# # looping through the columns, doing log(x+1) transformations
# for col in df.columns:
playtime_quantile = playtime.withColumn("author_playtime_forever", \
                F.log(F.when(playtime["author_playtime_forever"] < d["author_playtime_forever"][0],d["author_playtime_forever"][0])\
                .when(playtime["author_playtime_forever"] > d["author_playtime_forever"][1], d["author_playtime_forever"][1])\
                .otherwise(playtime["author_playtime_forever"] ) +1).alias("author_playtime_forever"))

mean = playtime_quantile.select(F.mean(playtime_quantile.author_playtime_forever)).collect()

playtime_capped = playtime_quantile.withColumn("author_playtime_forever", F.when(playtime_quantile.author_playtime_forever > mean[0][0]*2, mean[0][0]*2).otherwise(playtime_quantile.author_playtime_forever))
playtime_scaled = playtime_capped.withColumn("author_playtime_forever", playtime_capped.author_playtime_forever / (mean[0][0]*4)+0.5)


23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:03 WARN RowBasedKeyValueBatch: Calling spill() on

In [6]:
# import pandas as pd

# # generate historgram with pyspark
# histo = playtime_scaled.select('author_playtime_forever').rdd.flatMap(lambda x: x).histogram(10)
# # plot histogram with matplotlib
# pd.DataFrame(
#     list(zip(*histo)), 
#     columns=['bin', 'frequency']
# ).set_index(
#     'bin'
# ).plot(kind='bar');

In [7]:
playtime_pd = playtime_scaled.sample(0.001).toPandas()

23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 19:39:58 WARN RowBasedKeyValueBatch: Calling spill() on

In [8]:
from scipy.sparse import csr_matrix
# pivot ratings into movie features
df_movie_features = playtime_pd.pivot(
    index='app_id',
    columns='author_steamid',
    values='author_playtime_forever'
).fillna(0)
# convert dataframe of movie features to scipy sparse matrix
mat_movie_features = csr_matrix(df_movie_features.values)

In [20]:
# save df_movie_features into csv
df_movie_features.to_csv('game_features.csv')

In [10]:
# loop over the pivot and count the number of 0s
num_zeros = 0
for row in df_movie_features.values:
    num_zeros += len(row[row==0])
# print percentage of 0s
print(num_zeros)
print('sparsity: {:.2%}'.format(num_zeros / (df_movie_features.shape[0] * df_movie_features.shape[1])))

4813209
sparsity: 99.66%


In [15]:
from sklearn.neighbors import NearestNeighbors
model_knn = NearestNeighbors(metric='euclidean', algorithm='brute', n_neighbors=20, n_jobs=-1)

In [16]:
#get the index of the pivot pd df_movie_features
hashmap = [ index for index, row in df_movie_features.iterrows() ]
hashmap = { hashmap[i]:i for i in range(0,len(hashmap) ) }

In [57]:
# save hashmap to csv
import csv
with open('itemBasedRecommendation_hashmap.csv', 'w') as f:
    for key in hashmap.keys():
        f.write("%s,%s\n"%(key,hashmap[key]))

In [17]:
n_recommendations = 10

model_knn.fit(mat_movie_features)
# get input movie index
# inference

game_id = 1015500


print('Recommendation system start to make inference')
print('......\n')
distances, indices = model_knn.kneighbors(
    mat_movie_features[hashmap["294100"]],
    n_neighbors=n_recommendations+1)
# get list of raw idx of recommendations
raw_recommends = \
    sorted(
        list(
            zip(
                indices.squeeze().tolist(),
                distances.squeeze().tolist()
            )
        ),
        key=lambda x: x[1]
    )[:0:-1]
# print('It took my system {:.2f}s to make inference \n\
#         '.format(time.time() - t0))

reverse_hashmap = {v: k for k, v in hashmap.items()}
# print('Recommendations for {}:'.format(fav_movie))
for i, (idx, dist) in enumerate(raw_recommends):
    print('{0}: {1}, with distance '
            'of {2}'.format(i+1, reverse_hashmap[idx], dist))

Recommendation system start to make inference
......

1: 631510, with distance of 6.0534430496774005
2: 817130, with distance of 6.053424127254796
3: 247240, with distance of 6.050935146433168
4: 400940, with distance of 6.050094134275542
5: 357190, with distance of 6.049897279159437
6: 543460, with distance of 6.04835290364297
7: 569860, with distance of 6.047607609421978
8: 543900, with distance of 6.046475407627503
9: 732810, with distance of 6.040006458882563
10: 334040, with distance of 6.039004552802286


In [19]:
import pickle

filename = "collabRecommendation.pickle"

# save model
pickle.dump(model_knn, open(filename, "wb"))