# Collaborative Filtering Recommender System

## Import Modules, create Spark session and load in the Dataset

In [285]:
import pandas as pd
import pyspark.sql.functions as func
from pyspark.sql.types import DateType, IntegerType,NumericType
from pyspark.sql.functions import min, max, col
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.feature import Bucketizer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder,CrossValidatorModel

# Create our Spark Session
spark = SparkSession.builder.appName('recnn').getOrCreate()

Once the spark session was created, it was time to read in the dataset from the HDFS as below

In [286]:
df = spark.read.csv("hdfs://localhost:9000/john/hadoop/input/steam-200k.csv")

## Quick look at the Data and tidying

Now I wanted to take a quick look at the dataset and what the rows looked like

In [287]:
df.head(15)

[Row(_c0='151603712', _c1='The Elder Scrolls V Skyrim', _c2='purchase', _c3='1.0', _c4='0'),
 Row(_c0='151603712', _c1='The Elder Scrolls V Skyrim', _c2='play', _c3='273.0', _c4='0'),
 Row(_c0='151603712', _c1='Fallout 4', _c2='purchase', _c3='1.0', _c4='0'),
 Row(_c0='151603712', _c1='Fallout 4', _c2='play', _c3='87.0', _c4='0'),
 Row(_c0='151603712', _c1='Spore', _c2='purchase', _c3='1.0', _c4='0'),
 Row(_c0='151603712', _c1='Spore', _c2='play', _c3='14.9', _c4='0'),
 Row(_c0='151603712', _c1='Fallout New Vegas', _c2='purchase', _c3='1.0', _c4='0'),
 Row(_c0='151603712', _c1='Fallout New Vegas', _c2='play', _c3='12.1', _c4='0'),
 Row(_c0='151603712', _c1='Left 4 Dead 2', _c2='purchase', _c3='1.0', _c4='0'),
 Row(_c0='151603712', _c1='Left 4 Dead 2', _c2='play', _c3='8.9', _c4='0'),
 Row(_c0='151603712', _c1='HuniePop', _c2='purchase', _c3='1.0', _c4='0'),
 Row(_c0='151603712', _c1='HuniePop', _c2='play', _c3='8.5', _c4='0'),
 Row(_c0='151603712', _c1='Path of Exile', _c2='purchase', 

And now a look at some basic stats, including the count of how many entries we have in the dataset (200,000) the maximum play time of any game by any user (999 hours) and the minimum play time of any game by any user (0.1 hours)

In [288]:
df.describe().show()

+-------+-------------------+----------------+--------+------------------+------+
|summary|                _c0|             _c1|     _c2|               _c3|   _c4|
+-------+-------------------+----------------+--------+------------------+------+
|  count|             200000|          200000|  200000|            200000|200000|
|   mean|  1.0365586594664E8|           140.0|    null|17.874383999999942|   0.0|
| stddev|7.208073512913969E7|             0.0|    null|138.05695165086743|   0.0|
|    min|          100012061|     007 Legends|    play|               0.1|     0|
|    max|           99992274|theHunter Primal|purchase|             999.0|     0|
+-------+-------------------+----------------+--------+------------------+------+



I now wanted to drop column 'c4' as it is not clear what it actually represents, and I also don't need whatever it may be for collaborative filtering

In [290]:
df = df[('_c0', '_c1', '_c2', '_c3')]

It was necessary to remove the rows that are for purchases instead of play times. These rows also weren't needed for my collaborative filtering, I just needed to know how much time each user spent playing each game.

In [292]:
df = df[df['_c2'] != 'purchase']

So I was now left with just our user ID, game title and how many hours they have played the game for

In [293]:
df.head()

Row(_c0='151603712', _c1='The Elder Scrolls V Skyrim', _c2='play', _c3='273.0')

I now created a 'rating' column that was initially just a duplicate of the play duration column, and also a 'rating_double' column which contained a double version of this instead of string

In [297]:
df = df.withColumn('rating', df._c3)
changedTypedf = df.withColumn("rating_double", df["rating"].cast(DoubleType()))

Looking at the Schema at this point, I now had a column containing the play times as doubles

In [298]:
changedTypedf.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating_double: double (nullable = true)



## Create a derived Ratings column

The next six cells created a dataframe that gave me the total hours played by any single gamer (gamer_total_hours_played), and what percentage of that total is made up by each game they played (perc_of_gamer_total_hours).

In [300]:
hours_by_game = changedTypedf.groupBy("_c0", "_c1").sum("rating_double")

In [301]:
hours_by_gamer = changedTypedf.groupBy("_c0").sum("rating_double")

In [302]:
hours_by_gamer = hours_by_gamer.withColumnRenamed("sum(rating_double)", "gamer_total_hours_played")

In [303]:
hours_by_gamer = hours_by_gamer.withColumnRenamed("_c0", "gamer")

In [304]:
hours_by_gamer_by_game = hours_by_game.alias("a").join(hours_by_gamer\
                     .alias("b"),hours_by_game['_c0'] == hours_by_gamer['gamer'],how='left')

In [305]:
hours_by_gamer_by_game = hours_by_gamer_by_game.withColumn("perc_of_gamer_total_hours", 
                                                         hours_by_gamer_by_game["sum(rating_double)"]/ hours_by_gamer_by_game["gamer_total_hours_played"] )


In [306]:
hours_by_gamer_by_game.show(10)

+---------+--------------------+------------------+---------+------------------------+-------------------------+
|      _c0|                 _c1|sum(rating_double)|    gamer|gamer_total_hours_played|perc_of_gamer_total_hours|
+---------+--------------------+------------------+---------+------------------------+-------------------------+
| 26122540|Call of Duty Blac...|              22.0| 26122540|                   182.3|      0.12068019747668678|
|126340495|     Team Fortress 2|              16.1|126340495|      2770.8999999999996|     0.005810386517016133|
| 97298878|            Mafia II|              15.9| 97298878|                   220.6|      0.07207615593834996|
| 11373749|   Fate of the World|               2.3| 11373749|      3943.2000000000016|     5.832826131061064E-4|
| 46759859|       Day of Defeat|               5.8| 46759859|                   812.8|     0.007135826771653544|
| 32592631|  Duke Nukem Forever|               8.8| 32592631|                   286.7|     0.030

Because I now had this information, I was then able to add a 'rank' column which will order, from 1 to n, the games played by each gamer in terms of how much time they spent playing them. 

In [307]:
ranked =  hours_by_gamer_by_game.withColumn("rank", dense_rank().over(Window.partitionBy("gamer").orderBy(desc("perc_of_gamer_total_hours"))))

In [308]:
ranked = ranked.withColumn("gamer_total_hours_played", func.round(ranked["gamer_total_hours_played"], 2))
ranked = ranked.withColumn("perc_of_gamer_total_hours", func.round(ranked["perc_of_gamer_total_hours"], 2))

In [309]:
ranked.show(10)

+---------+--------------------+------------------+---------+------------------------+-------------------------+----+
|      _c0|                 _c1|sum(rating_double)|    gamer|gamer_total_hours_played|perc_of_gamer_total_hours|rank|
+---------+--------------------+------------------+---------+------------------------+-------------------------+----+
|108750879|       Borderlands 2|             758.0|108750879|                  1807.5|                     0.42|   1|
|108750879|The Elder Scrolls...|             511.0|108750879|                  1807.5|                     0.28|   2|
|108750879|Call of Duty Mode...|             149.0|108750879|                  1807.5|                     0.08|   3|
|108750879|       Saints Row IV|             117.0|108750879|                  1807.5|                     0.06|   4|
|108750879|           Fallout 4|              77.0|108750879|                  1807.5|                     0.04|   5|
|108750879|   Fallout New Vegas|              71.0|10875

Above we can now the top 10 played games for gamer '108750879'

In [310]:
new_ranked = ranked.drop("sum(rating_double)")

In [311]:
new_ranked.show(20)

+---------+--------------------+---------+------------------------+-------------------------+----+
|      _c0|                 _c1|    gamer|gamer_total_hours_played|perc_of_gamer_total_hours|rank|
+---------+--------------------+---------+------------------------+-------------------------+----+
|108750879|       Borderlands 2|108750879|                  1807.5|                     0.42|   1|
|108750879|The Elder Scrolls...|108750879|                  1807.5|                     0.28|   2|
|108750879|Call of Duty Mode...|108750879|                  1807.5|                     0.08|   3|
|108750879|       Saints Row IV|108750879|                  1807.5|                     0.06|   4|
|108750879|           Fallout 4|108750879|                  1807.5|                     0.04|   5|
|108750879|   Fallout New Vegas|108750879|                  1807.5|                     0.04|   6|
|108750879|Saints Row The Third|108750879|                  1807.5|                     0.02|   7|
|108750879

The Spark ML function, 'QuantileDiscretizer' allowed me to assign a rating (0-9) from each gamer for each game, by separating the games they played into buckets based on the time spent playing. 

The game that they spent the most time playing would receive a 9 for example.

In [312]:
game_rated = QuantileDiscretizer(numBuckets=10, inputCol="perc_of_gamer_total_hours",outputCol="rating")\
.fit(hours_by_gamer_by_game).transform(hours_by_gamer_by_game)

In [313]:
ratings = game_rated.select("gamer","_c1", "rating","perc_of_gamer_total_hours")

In [314]:
ratings = ratings.withColumnRenamed("_c1", "game")

In [315]:
ratings.show(20)

+---------+--------------------+------+-------------------------+
|    gamer|                game|rating|perc_of_gamer_total_hours|
+---------+--------------------+------+-------------------------+
| 26122540|Call of Duty Blac...|   7.0|      0.12068019747668678|
|126340495|     Team Fortress 2|   4.0|     0.005810386517016133|
| 97298878|            Mafia II|   7.0|      0.07207615593834996|
| 11373749|   Fate of the World|   1.0|     5.832826131061064E-4|
| 46759859|       Day of Defeat|   4.0|     0.007135826771653544|
| 32592631|  Duke Nukem Forever|   6.0|     0.030694105336588774|
| 57103808|   Company of Heroes|   7.0|      0.06971097360382232|
| 94088853|       ORION Prelude|   5.0|      0.01466753585397653|
|  9823354|Europa Universali...|   1.0|     7.667311694475893E-4|
|249165768|Battlefield Bad C...|   4.0|     0.005747952291995977|
|130280718|              Dota 2|   7.0|      0.05423353624792473|
| 25096601|              Dota 2|   8.0|      0.25513254933227025|
| 25096601

In [316]:
ratings = ratings.withColumn("gamer", ratings["gamer"].cast(IntegerType()))

In [317]:
ratings.show(10)

+---------+--------------------+------+-------------------------+
|    gamer|                game|rating|perc_of_gamer_total_hours|
+---------+--------------------+------+-------------------------+
| 26122540|Call of Duty Blac...|   7.0|      0.12068019747668678|
|126340495|     Team Fortress 2|   4.0|     0.005810386517016133|
| 97298878|            Mafia II|   7.0|      0.07207615593834996|
| 11373749|   Fate of the World|   1.0|     5.832826131061064E-4|
| 46759859|       Day of Defeat|   4.0|     0.007135826771653544|
| 32592631|  Duke Nukem Forever|   6.0|     0.030694105336588774|
| 57103808|   Company of Heroes|   7.0|      0.06971097360382232|
| 94088853|       ORION Prelude|   5.0|      0.01466753585397653|
|  9823354|Europa Universali...|   1.0|     7.667311694475893E-4|
|249165768|Battlefield Bad C...|   4.0|     0.005747952291995977|
+---------+--------------------+------+-------------------------+
only showing top 10 rows



It can be seen above that I now had the rating from each gamer for each game. I needed a game_id column for the model as it doesn't take in strings and so this next cell achieved this. It gave a unique game_id from 1 to n (n being the total number of games) to each game in the list

In [318]:
ratings = ratings.withColumn("game_id", func.dense_rank().over(Window.orderBy(ratings.game)))

## Build and train the Model

I was now able to split these ratings into training and test sets

In [319]:
training, test = ratings.randomSplit([0.8,0.2], seed = 36)

Next I defined the alternating least squares model for collaborative filtering

In [320]:
from pyspark.ml.recommendation import ALS


als = ALS(maxIter=10, regParam=0.01, userCol="gamer", itemCol="game_id", ratingCol="rating",
          coldStartStrategy="drop", nonnegative = True)

Fit the model to the training set

In [321]:
model = als.fit(training)

Use the model to make predictions on the test set

In [322]:
predictions = model.transform(test)

In [323]:
predictions.show(20)

+---------+--------------------+------+-------------------------+-------+----------+
|    gamer|                game|rating|perc_of_gamer_total_hours|game_id|prediction|
+---------+--------------------+------+-------------------------+-------+----------+
|298890193|                  C9|   5.0|     0.008174386920980927|    463| 10.399544|
|141773234|                  C9|   0.0|     5.068166844052507E-5|    463| 2.2671351|
| 65229865|Europa Universali...|   8.0|       0.5202312138728324|   1088|  9.895632|
| 58217836|Europa Universali...|   4.0|      0.00728108833109791|   1088| 3.2841067|
| 75368808|Europa Universali...|   8.0|      0.22816053345276305|   1088|   6.60545|
| 65958466|Europa Universali...|   4.0|     0.007658922644881285|   1088| 2.6655507|
| 42695514|Football Manager ...|   5.0|     0.008433107756376887|   1238|  6.052161|
| 40080833|Football Manager ...|   6.0|     0.037704231252618355|   1238| 5.8518333|
|113260911|Football Manager ...|   7.0|      0.06747638326585695|

## Evaluate the Model

In [325]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 3.1969283493060523


Having evaluated the performance using root mean square error, I could see it was around 3. This isn't amazing for a range of 0 to 9 but it would still likely produce useful recommendations

## Recommendations

It was now time to get some actual recommendations based on the model. First I wanted to get the top 20 gamers in terms of hours played which I did in the next cell

In [333]:
top_gamers = changedTypedf.groupBy(changedTypedf['_c0']).agg({'_c3':"sum"}).sort("sum(_c3)", ascending=False).dropna().limit(20)
top_gamers.show(20)

+---------+------------------+
|      _c0|          sum(_c3)|
+---------+------------------+
| 73017395|           11754.0|
| 10599862|11651.699999999999|
|100630947|           10853.2|
| 26762388|10470.100000000004|
|153382649|            9640.0|
| 43684632| 9546.299999999988|
| 48798067| 9427.199999999997|
| 52731290|            9417.6|
| 42935819|            8172.9|
| 14544587|8137.0999999999985|
| 52567955|            7836.7|
|130882834|            7801.9|
| 57433226| 7741.499999999999|
| 63615483| 7190.400000000001|
| 47063596| 7161.000000000001|
| 67694595| 7152.100000000001|
| 17017968|            7108.5|
| 50818751| 6929.899999999999|
| 49893565|            6891.9|
| 24721232|            6887.0|
+---------+------------------+



I placed these gamers into a list that I could then use

In [335]:
top20_gamer_list = [row._c0 for row in top_gamers.select("_c0").collect()]
top20_gamer_list

['73017395',
 '10599862',
 '100630947',
 '26762388',
 '153382649',
 '43684632',
 '48798067',
 '52731290',
 '42935819',
 '14544587',
 '52567955',
 '130882834',
 '57433226',
 '63615483',
 '47063596',
 '67694595',
 '17017968',
 '50818751',
 '49893565',
 '24721232']

Using this list and the model, I created a dictionary containing all 20 top gamers and the top 10 game recommendations for each

In [337]:
gamer_recs = model.recommendForAllUsers(10)

rec_game_top20_gamers = {}

for gamer in top20_gamer_list:
    rec_game = gamer_recs.where(gamer_recs.gamer == gamer).select("recommendations").collect()
    rec_game_top20_gamers[gamer] = [i.game_id for i in rec_game[0]["recommendations"]]

rec_game_top20_gamers

{'73017395': [2521, 2074, 3007, 1184, 1127, 944, 871, 2151, 84, 2513],
 '10599862': [1999, 3404, 2797, 2301, 3406, 944, 916, 871, 3448, 650],
 '100630947': [966, 1440, 2797, 2513, 1705, 1788, 1092, 3118, 108, 2699],
 '26762388': [84, 2521, 108, 3404, 779, 1815, 1940, 1617, 2547, 3524],
 '153382649': [2074, 3404, 2634, 1999, 1113, 2521, 1184, 1790, 2301, 3406],
 '43684632': [2301, 3406, 84, 3404, 1292, 1127, 1113, 3054, 650, 2634],
 '48798067': [2151, 944, 650, 3406, 1127, 916, 2301, 1184, 871, 2634],
 '52731290': [2301, 3406, 650, 1292, 1127, 1113, 1999, 916, 2634, 608],
 '42935819': [2797, 3448, 2707, 2852, 2221, 966, 1440, 1492, 754, 2513],
 '14544587': [2797, 3448, 1999, 966, 2221, 2707, 2852, 1440, 3406, 3373],
 '52567955': [2699, 966, 668, 1857, 2377, 2797, 1440, 3391, 2828, 2353],
 '130882834': [1999, 2797, 3404, 2301, 3448, 3406, 966, 108, 84, 2513],
 '57433226': [779, 2828, 84, 3404, 2521, 1673, 108, 668, 2547, 1940],
 '63615483': [3007, 1999, 2074, 1609, 2134, 1806, 1087, 1869

I wanted a list of game titles and their game ids so that I could see what the actual game recommendations were

In [331]:
game_list = ratings.select('game_id', 'game').dropDuplicates()
game_list.show(20)

+-------+--------------------+
|game_id|                game|
+-------+--------------------+
|      1|         007 Legends|
|      2|           0RBITALIS|
|      3|1... 2... 3... KI...|
|      4|     10 Second Ninja|
|      5|          10,000,000|
|      6|   100% Orange Juice|
|      7|           1000 Amps|
|      8|12 Labours of Her...|
|      9|12 Labours of Her...|
|     10|12 Labours of Her...|
|     11|                 140|
|     12|             15 Days|
|     13|        16bit Trader|
|     14|1701 A.D. Sunken ...|
|     15|18 Wheels of Stee...|
|     16|1953 NATO vs Wars...|
|     17|              1Quest|
|     18|  3 Stars of Destiny|
|     19|3089 -- Futuristi...|
|     20|        3D Mini Golf|
+-------+--------------------+
only showing top 20 rows



I took one of the gamers as user1, (gamer id '10599862') and had a look at the recommendations and also the actual games they played the most of

In [339]:
user1_recommend = game_list.filter(game_list["game_id"].isin(rec_game_top20_gamers['10599862']))
user1_recommend

DataFrame[game_id: int, game: string]

In [340]:
user1_recommend.show()

+-------+--------------------+
|game_id|                game|
+-------+--------------------+
|    650|Construction-Simu...|
|    871|         Diaper Dash|
|    916|Don Bradman Crick...|
|    944|Dragon's Prophet ...|
|   1999|             NBA 2K9|
|   2301|Professional Farm...|
|   2797|Speedball 2 Tourn...|
|   3404|               WRC 5|
|   3406|WTFast Gamers Pri...|
|   3448|Warrior Kings Bat...|
+-------+--------------------+



In [352]:
ratings.filter(ratings.gamer=="10599862").sort('rating', ascending=False).limit(10).show()

+--------+--------------------+------+-------------------------+-------+
|   gamer|                game|rating|perc_of_gamer_total_hours|game_id|
+--------+--------------------+------+-------------------------+-------+
|10599862|   Crusader Kings II|   7.0|      0.05767398748680451|    701|
|10599862|     Elite Dangerous|   7.0|      0.06402499206124429|   1033|
|10599862|Football Manager ...|   7.0|      0.04986396834796639|   1236|
|10599862|Mount & Blade War...|   7.0|      0.07552545980414876|   1969|
|10599862|       Path of Exile|   7.0|      0.06737214312074634|   2198|
|10599862|Total War ROME II...|   7.0|      0.09105967369568391|   3266|
|10599862|       DARK SOULS II|   6.0|     0.017593999158921018|    727|
|10599862|The Elder Scrolls...|   6.0|     0.018108945475767486|   3068|
|10599862|    Total War ATTILA|   6.0|      0.03587459340697066|   3263|
|10599862|The Witcher 3 Wil...|   6.0|     0.017336526000497784|   3180|
+--------+--------------------+------+-------------

I could see that the recommendations passed a basic sanity test at least, Sports games and Fantasy games were being recommended to user1 and I could see that similar games appear in their top played games

In [342]:
user2_recommend = game_list.filter(game_list["game_id"].isin(rec_game_top20_gamers['100630947']))
user2_recommend.show()

+-------+--------------------+
|game_id|                game|
+-------+--------------------+
|    108|Air Conflicts Pac...|
|    966| Duke Nukem Forever |
|   1092|        EverQuest II|
|   1440|Hacker Evolution ...|
|   1705|LEGO Batman The V...|
|   1788|Lost Planet Extre...|
|   2513|Rugby League Team...|
|   2699|Silent Hill Homec...|
|   2797|Speedball 2 Tourn...|
|   3118|             The Maw|
+-------+--------------------+



In [353]:
ratings.filter(ratings.gamer=="100630947").sort('rating', ascending=False).limit(10).show()

+---------+--------------------+------+-------------------------+-------+
|    gamer|                game|rating|perc_of_gamer_total_hours|game_id|
+---------+--------------------+------+-------------------------+-------+
|100630947|              Dota 2|   8.0|       0.9621125566653153|    923|
|100630947|Counter-Strike Gl...|   6.0|      0.03731618324549441|    674|
|100630947|        Aura Kingdom|   0.0|     1.842774481258983...|    247|
|100630947|FreeStyle2 Street...|   0.0|     2.303468101573729...|   1264|
|100630947|       Left 4 Dead 2|   0.0|     9.213872406294918E-6|   1734|
|100630947|                Rust|   0.0|     3.132716618140272E-4|   2524|
+---------+--------------------+------+-------------------------+-------+



In [349]:
user3_recommend = game_list.filter(game_list["game_id"].isin(rec_game_top20_gamers['26762388']))
user3_recommend.show()

+-------+--------------------+
|game_id|                game|
+-------+--------------------+
|     84|Afterfall InSanit...|
|    108|Air Conflicts Pac...|
|    779|            Daylight|
|   1617|Jagged Alliance F...|
|   1815|            MLB 2K12|
|   1940| Missing Translation|
|   2521|    Runestone Keeper|
|   2547|STORM Frontline N...|
|   3404|               WRC 5|
|   3524|X-Plane 10 Global...|
+-------+--------------------+



In [351]:
ratings.filter(ratings.gamer=="26762388").sort('rating', ascending=False).limit(10).show()

+--------+--------------------+------+-------------------------+-------+
|   gamer|                game|rating|perc_of_gamer_total_hours|game_id|
+--------+--------------------+------+-------------------------+-------+
|26762388|Mount & Blade War...|   7.0|      0.07879580901806092|   1969|
|26762388|The Elder Scrolls...|   7.0|      0.10410597797537746|   3068|
|26762388|Chivalry Medieval...|   6.0|     0.032473424322594806|    560|
|26762388|           Far Cry 3|   6.0|      0.02234935673966819|   1171|
|26762388|Counter-Strike So...|   6.0|      0.03018118260570576|    676|
|26762388|Age of Empires II...|   6.0|     0.021680786238908886|     93|
|26762388| Assassin's Creed II|   6.0|     0.017382833019741924|    226|
|26762388|Assassin's Creed ...|   6.0|     0.017669363234353055|    228|
|26762388|              Dota 2|   6.0|     0.022444866811205232|    923|
|26762388|  Dragon Age Origins|   6.0|     0.038681578972502635|    934|
+--------+--------------------+------+-------------

I did the same for two more users and again I was happy enough with the reccomendations. At this point I was fairly confident that the system was doing its job and likely making some useful recommendations