
# BDTT Task 2: Recommender System

## Introduction

In this task,  I will train a collaborative filtering recommender system using a dataset from Steam, an online videogame distribution service. The dataset contains details of which games members have purchased and how long they played their purchased games, in hours. I will use the play duration data to train an Alternating Least Squares (ALS) model to generate recommendations. I will evaluate the performance of the model using the Root Mean Square Error (RMSE) metric and improve the model using a grid search to tune the hyperparameters. 

## Data Import and Preprocessing

I imported the dataset from a csv file into a dataframe. There were 200,000 rows in the dataframe and no null values. There were 707 duplicate rows, all of which concerned the purchase data, not the play duration data. I added a column to my dataframe containing unique integer IDs for the videogames. This is because the ALS model requires integer inputs and the videogame names provided are strings, not integers. I then split the dataframe into two: one for purchase data and one for play duration data.

In [0]:
# check csv file successfully uploaded into DBFS
dbutils.fs.ls("/FileStore/tables/steam_200k.csv")

[FileInfo(path='dbfs:/FileStore/tables/steam_200k.csv', name='steam_200k.csv', size=8059447, modificationTime=1743156953000)]

In [0]:
# inspect structure of csv file
for line in dbutils.fs.head("/FileStore/tables/steam_200k.csv").splitlines():
    print(line.find(","))
    print(line)

[Truncated to first 65536 bytes]
9
151603712,The Elder Scrolls V Skyrim,purchase,1
9
151603712,The Elder Scrolls V Skyrim,play,273
9
151603712,Fallout 4,purchase,1
9
151603712,Fallout 4,play,87
9
151603712,Spore,purchase,1
9
151603712,Spore,play,14.9
9
151603712,Fallout New Vegas,purchase,1
9
151603712,Fallout New Vegas,play,12.1
9
151603712,Left 4 Dead 2,purchase,1
9
151603712,Left 4 Dead 2,play,8.9
9
151603712,HuniePop,purchase,1
9
151603712,HuniePop,play,8.5
9
151603712,Path of Exile,purchase,1
9
151603712,Path of Exile,play,8.1
9
151603712,Poly Bridge,purchase,1
9
151603712,Poly Bridge,play,7.5
9
151603712,Left 4 Dead,purchase,1
9
151603712,Left 4 Dead,play,3.3
9
151603712,Team Fortress 2,purchase,1
9
151603712,Team Fortress 2,play,2.8
9
151603712,Tomb Raider,purchase,1
9
151603712,Tomb Raider,play,2.5
9
151603712,The Banner Saga,purchase,1
9
151603712,The Banner Saga,play,2
9
151603712,Dead Island Epidemic,purchase,1
9
151603712,Dead Island Epidemic,play,1.4
9
151603712,BioShock I

In [0]:
# create dataframe from csv file. Include quote='"' to ensure game names with commas in them are not split across multiple columns 

df = spark.read.csv("/FileStore/tables/steam_200k.csv", header=False, inferSchema=True, quote='"').toDF("memberID", "game", "purchplay", "hours")
steamDF = df
steamDF.show(10, truncate = False)

+---------+--------------------------+---------+-----+
|memberID |game                      |purchplay|hours|
+---------+--------------------------+---------+-----+
|151603712|The Elder Scrolls V Skyrim|purchase |1.0  |
|151603712|The Elder Scrolls V Skyrim|play     |273.0|
|151603712|Fallout 4                 |purchase |1.0  |
|151603712|Fallout 4                 |play     |87.0 |
|151603712|Spore                     |purchase |1.0  |
|151603712|Spore                     |play     |14.9 |
|151603712|Fallout New Vegas         |purchase |1.0  |
|151603712|Fallout New Vegas         |play     |12.1 |
|151603712|Left 4 Dead 2             |purchase |1.0  |
|151603712|Left 4 Dead 2             |play     |8.9  |
+---------+--------------------------+---------+-----+
only showing top 10 rows



In [0]:
steamDF.count()

200000

From the code above, I can see there are 200,000 rows in my dataframe (steamDF). Next I will check for null values. 

In [0]:
# Checking how many null values in each column

from pyspark.sql.functions import col, sum
steamDF.select([sum(col(c).isNull().cast("int")).alias(c) for c in steamDF.columns]).show()

+--------+----+---------+-----+
|memberID|game|purchplay|hours|
+--------+----+---------+-----+
|       0|   0|        0|    0|
+--------+----+---------+-----+



There are no null values in any of my columns. Next I will check for duplicate rows

In [0]:
# Checking for duplicate rows

import pyspark.sql.functions as F

# group the DataFrame by all columns, count the number of occurrences, and filter where the count is greater than 1
duplicates = steamDF.groupBy(steamDF.columns)\
    .count()\
    .where(F.col('count') > 1)

# count number of rows that have been duplicated 
duplicates.count()

707

The code above tells me there are 707 duplicate rows. I will inspect these more closely 

In [0]:
duplicates.show()

+---------+--------------------+---------+-----+-----+
| memberID|                game|purchplay|hours|count|
+---------+--------------------+---------+-----+-----+
|  9823354|Sid Meier's Civil...| purchase|  1.0|    2|
|  2259650|Sid Meier's Civil...| purchase|  1.0|    2|
| 56038151|Grand Theft Auto ...| purchase|  1.0|    2|
|142001340|Grand Theft Auto ...| purchase|  1.0|    2|
|152959594|Grand Theft Auto III| purchase|  1.0|    2|
| 50769696|Grand Theft Auto ...| purchase|  1.0|    2|
| 11813637|Sid Meier's Civil...| purchase|  1.0|    2|
| 65398650|Sid Meier's Civil...| purchase|  1.0|    2|
| 36502549|Sid Meier's Civil...| purchase|  1.0|    2|
| 80779496|Sid Meier's Civil...| purchase|  1.0|    2|
| 51557405|Grand Theft Auto ...| purchase|  1.0|    2|
|100351493|Sid Meier's Civil...| purchase|  1.0|    2|
|100351493|Sid Meier's Civil...| purchase|  1.0|    2|
|100351493|Sid Meier's Civil...| purchase|  1.0|    2|
|189858084|Grand Theft Auto ...| purchase|  1.0|    2|
| 69009454

From looking at 20 of the duplicate rows above, it would appear they are all of the 'purchase' type, not 'play' type. I will check this.

In [0]:
# check how many duplicates are of 'play' records rather than 'purchase' records 
play_duplicates = duplicates.where(duplicates.purchplay=='play')
play_duplicates.count()


0

The code above confirms there are no duplicate rows which contain 'play' in the 'purchplay' column. Since I will be using the 'play' rows to train my recommender system, I will ignore the duplicates in the 'purchase' rows for now. Next I will add a column containing an integer ID for each game (this will be necessary for training the recommender system, as the inputs need to be integers, not strings).

In [0]:
# generate a unique integer ID for each game (gameID) and add this into the dataframe (steamDF)

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="game", outputCol="gameID")
steamDF_indexed = indexer.fit(steamDF).transform(steamDF)

steamDF_indexed.show(10, truncate = False)


🏃 View run monumental-hare-861 at: https://community.cloud.databricks.com/ml/experiments/2194890496938728/runs/77fbf61941974a9cbfd686b33aff0e50
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/2194890496938728
+---------+--------------------------+---------+-----+------+
|memberID |game                      |purchplay|hours|gameID|
+---------+--------------------------+---------+-----+------+
|151603712|The Elder Scrolls V Skyrim|purchase |1.0  |8.0   |
|151603712|The Elder Scrolls V Skyrim|play     |273.0|8.0   |
|151603712|Fallout 4                 |purchase |1.0  |100.0 |
|151603712|Fallout 4                 |play     |87.0 |100.0 |
|151603712|Spore                     |purchase |1.0  |332.0 |
|151603712|Spore                     |play     |14.9 |332.0 |
|151603712|Fallout New Vegas         |purchase |1.0  |29.0  |
|151603712|Fallout New Vegas         |play     |12.1 |29.0  |
|151603712|Left 4 Dead 2             |purchase |1.0  |4.0   |
|151603712|Left 4 D

Next I will split my original dataframe steamDF into a 'purchase' dataframe and a 'play' dataframe.

In [0]:
# split dataframe into 2 dataframes: one for 'purchase' rows and one for 'play' rows 

# Import required modules 
from pyspark.sql import SparkSession 

# Create a SparkSession 
spark = SparkSession.builder.appName("Split DataFrame").getOrCreate() 

# Split the DataFrame into two 
purchase_df = steamDF_indexed.filter(steamDF_indexed['purchplay'] == 'purchase') 
play_df = steamDF_indexed.filter(steamDF_indexed['purchplay'] == 'play') 

# Print the new dataframes 
purchase_df.show(5, truncate=False) 
play_df.show(5, truncate=False)


+---------+--------------------------+---------+-----+------+
|memberID |game                      |purchplay|hours|gameID|
+---------+--------------------------+---------+-----+------+
|151603712|The Elder Scrolls V Skyrim|purchase |1.0  |8.0   |
|151603712|Fallout 4                 |purchase |1.0  |100.0 |
|151603712|Spore                     |purchase |1.0  |332.0 |
|151603712|Fallout New Vegas         |purchase |1.0  |29.0  |
|151603712|Left 4 Dead 2             |purchase |1.0  |4.0   |
+---------+--------------------------+---------+-----+------+
only showing top 5 rows

+---------+--------------------------+---------+-----+------+
|memberID |game                      |purchplay|hours|gameID|
+---------+--------------------------+---------+-----+------+
|151603712|The Elder Scrolls V Skyrim|play     |273.0|8.0   |
|151603712|Fallout 4                 |play     |87.0 |100.0 |
|151603712|Spore                     |play     |14.9 |332.0 |
|151603712|Fallout New Vegas         |play   

In [0]:
# check the number of rows in the two new dataframes equals number of rows in original dataframe

steamDF_rows = steamDF.count()
purchase_df_rows = purchase_df.count()
play_df_rows = play_df.count()

if purchase_df_rows + play_df_rows == steamDF_rows:
    print("it worked")
else: 
    print("it didn't work")

it worked


In [0]:
# check all 'hours' values in purchase_df are equal to 1 (to check that none of the 'play duration' rows were inaccurately labelled as 'purchase')

purchase_df.groupBy("hours").count().display()

hours,count
1.0,129511


Next I drop the 'purchplay' column from both new dataframes and 'hours' column from purchase dataframe as they're no longer needed (and might slow down processing if kept in)

In [0]:
dropped_purchase_df = purchase_df.drop("purchplay").drop("hours")
dropped_play_df = play_df.drop("purchplay") 

dropped_purchase_df.show(5, truncate=False) 
dropped_play_df.show(5, truncate=False)

+---------+--------------------------+------+
|memberID |game                      |gameID|
+---------+--------------------------+------+
|151603712|The Elder Scrolls V Skyrim|8.0   |
|151603712|Fallout 4                 |100.0 |
|151603712|Spore                     |332.0 |
|151603712|Fallout New Vegas         |29.0  |
|151603712|Left 4 Dead 2             |4.0   |
+---------+--------------------------+------+
only showing top 5 rows

+---------+--------------------------+-----+------+
|memberID |game                      |hours|gameID|
+---------+--------------------------+-----+------+
|151603712|The Elder Scrolls V Skyrim|273.0|8.0   |
|151603712|Fallout 4                 |87.0 |100.0 |
|151603712|Spore                     |14.9 |332.0 |
|151603712|Fallout New Vegas         |12.1 |29.0  |
|151603712|Left 4 Dead 2             |8.9  |4.0   |
+---------+--------------------------+-----+------+
only showing top 5 rows



In [0]:
dropped_purchase_df.count()

129511

In [0]:

dropped_play_df.count()

70489

From the last 2 lines of code, I can see that there are fewer rows in the 'play' dataframe than the 'purchase' dataframe. This is as I'd expect, as not all games that are purchased will necessarily be played. However, every game that is played must first have been purchased (so the 'play' set is a subset of the 'purchase' set). Lastly, I will check if there are any rows in the 'play' dataframe that conatin 0 hours of play.

In [0]:
# Check if any rows in the play duration dataframe contain 0.0 hours of play 

zero_hours_count = dropped_play_df.filter(dropped_play_df.hours == 0.0).count()
print("Number of rows with 0.0 hours:", zero_hours_count)

Number of rows with 0.0 hours: 0


## Exploratory Data Analysis (EDA)

In this EDA section, I explored the ranges and trends of my dataset. For example, I investigated which members purchased the most games, which games were most and least purchased, and which members and games had the longest and shortest play durations. Range in play duration will be particularly relevant for informing how I train my recommender system since ALS models are most accurate when their numerical inputs are evenly distributed. If the play duration is highly skewed, then the long play duration values could disproportionately influence the model, leading it to overemphasize heavy players and therefore making less accurate predictions and recommendations. 

### EDA of purchase data

In [0]:
# which members purchased the most games

member_purchase_df = dropped_purchase_df.groupBy("memberID").count().sort("count", ascending=False)

display(member_purchase_df)


memberID,count
62990992,1075
33865373,783
30246419,766
58345543,667
76892907,597
20772968,595
11403772,592
64787956,591
22301321,568
47457723,557


Databricks visualization. Run in Databricks to view.

In [0]:
# check highest 3 and lowest 3 values for games purchased by members. 
member_purchase_df.show(3)

member_purchase_df.tail(3)

+--------+-----+
|memberID|count|
+--------+-----+
|62990992| 1075|
|33865373|  783|
|30246419|  766|
+--------+-----+
only showing top 3 rows



[Row(memberID=251093449, count=1),
 Row(memberID=24817862, count=1),
 Row(memberID=127724511, count=1)]

From the histogram above, I can see there is quite a wide range in the nubmer of games purchased by each member. From the table, I can see the highest number of games purchased by an individual member is 1,075, followed by 783 and 766. The 3 lowest puchases were all 1 game per member. To note: I haven't eliminated duplicate rows from this dataframe, so some of these rows will include the same customer buying the same game more than once.

In [0]:
# check which games are most and least purchased 

game_purchase_df = dropped_purchase_df.groupBy("game").count().sort("count", ascending=False).limit(10)

display(game_purchase_df)

game,count
Dota 2,4841
Team Fortress 2,2323
Unturned,1563
Counter-Strike Global Offensive,1412
Half-Life 2 Lost Coast,981
Counter-Strike Source,978
Left 4 Dead 2,951
Counter-Strike,856
Warframe,847
Half-Life 2 Deathmatch,823


Databricks visualization. Run in Databricks to view.

In [0]:
# check 3 most purchased games and 3 least purchased games  
game_purchase_df.show(3)

game_purchase_df.tail(3)

+---------------+-----+
|           game|count|
+---------------+-----+
|         Dota 2| 4841|
|Team Fortress 2| 2323|
|       Unturned| 1563|
+---------------+-----+
only showing top 3 rows



[Row(game='Counter-Strike', count=856),
 Row(game='Warframe', count=847),
 Row(game='Half-Life 2 Deathmatch', count=823)]

From the bar chart above, we can see the most purchased game (Dota 2) has a far higher number of purchases (4,841) than the next 9 most-purchased games.  To note: I haven't eliminated duplicate rows, so some of these rows will include the same customer buying a game more than once 

### EDA of play data

In [0]:
# Check which members and games have the longest play times

dropped_play_df.orderBy("hours", ascending=False).display()

memberID,game,hours,gameID
73017395,Sid Meier's Civilization V,11754.0,11.0
100630947,Dota 2,10442.0,0.0
153382649,Team Fortress 2,9640.0,1.0
130882834,Dota 2,7765.0,0.0
52567955,Dota 2,6964.0,0.0
121199670,Dota 2,6753.0,0.0
86256882,Dota 2,6015.0,0.0
70487610,Sid Meier's Civilization V,6013.0,11.0
101414179,Dota 2,5982.0,0.0
12660489,Dota 2,5970.0,0.0


In [0]:
# HOW TO VISUALISE ABOVE? 

In [0]:
# see which member played the most hours across all games
member_play_df = dropped_play_df.drop("game").drop("gameID")

from pyspark.sql import functions as F

# Group by 'memberID', sum the 'hours', and sort in descending order
aggregated_df = member_play_df.groupBy("memberID") \
                  .agg(F.round(F.sum("hours"),3).alias("total_hours")) \
                  .orderBy(F.desc("total_hours"))

# Show the result
display(aggregated_df)


memberID,total_hours
73017395,11754.0
10599862,11651.7
100630947,10853.2
26762388,10470.1
153382649,9640.0
43684632,9546.3
48798067,9427.2
52731290,9417.6
42935819,8172.9
14544587,8137.1


Databricks visualization. Run in Databricks to view.

In [0]:
# Check memberIDs with 3 highest and 3 lowest play durations (across all games)

aggregated_df.show(3)
aggregated_df.tail(3)

+---------+-----------+
| memberID|total_hours|
+---------+-----------+
| 73017395|    11754.0|
| 10599862|    11651.7|
|100630947|    10853.2|
+---------+-----------+
only showing top 3 rows



[Row(memberID=85395339, total_hours=0.1),
 Row(memberID=289193172, total_hours=0.1),
 Row(memberID=74859759, total_hours=0.1)]

The code above shows there is very high variance in the hours column (0.1 hours to 11,754 hours). This might cause problems for the recommender system I will build, since ALS models work best when numerical values are more evenly distributed and not so highly skewed. The ALS might overfit to users who have extremely high play durations, which will mean the predictions are not generalisable across users with lower play durations. 

In [0]:
# see which game was played the most hours across all members 

game_play_df = dropped_play_df.drop("memberID").drop("gameID")

# Group by 'game', sum the 'hours', and sort in descending order
aggregated_df2 = game_play_df.groupBy("game") \
                  .agg(F.round(F.sum("hours"),3).alias("total_hours")) \
                  .orderBy(F.desc("total_hours"))

# Show the result
aggregated_df2.display()

game,total_hours
Dota 2,981684.6
Counter-Strike Global Offensive,322771.6
Team Fortress 2,173673.3
Counter-Strike,134261.1
Sid Meier's Civilization V,99821.3
Counter-Strike Source,96075.5
The Elder Scrolls V Skyrim,70889.3
Garry's Mod,49725.3
Call of Duty Modern Warfare 2 - Multiplayer,42009.9
Left 4 Dead 2,33596.7


In [0]:
# HOW TO VISUALISE ABOVE? 

In [0]:
# Check games with 3 highest and 3 lowest play durations (across all members)

aggregated_df2.show(3)
aggregated_df2.tail(3)

+--------------------+-----------+
|                game|total_hours|
+--------------------+-----------+
|              Dota 2|   981684.6|
|Counter-Strike Gl...|   322771.6|
|     Team Fortress 2|   173673.3|
+--------------------+-----------+
only showing top 3 rows



[Row(game='D.U.S.T.', total_hours=0.1),
 Row(game='Habitat', total_hours=0.1),
 Row(game='Sandmason', total_hours=0.1)]

In [0]:
# calculate mean playing time 

from pyspark.sql import functions as F
dropped_play_df.agg(F.mean('hours')).collect()[0][0]

48.87806324391008

In [0]:
# HOW TO VISUALISE ABOVE? 

The mean duration of play is 48.88 hours. 

## Model Training

In this section, I split the play duration dataframe I created at the end of the 'data import and pre-processing' section above into training and test sets. I then train an Alternating Least Squares (ALS) algorithm on a training dataset using arbitrary hyperparameters. 

In [0]:
# import mlflow and autolog machine learning runs to track experiments 
import mlflow

mlflow.pyspark.ml.autolog()

In [0]:
# split data in dataframe contianing only play records (dropped_play_df) into training and test dataset 

(training, test) = dropped_play_df.randomSplit([0.8, 0.2], seed=100)

In [0]:
# Apply the Alternating Least Squares (ALS) algorithm. Instantiate an instance of the estimator and specify the columns in the dataframe which are the userCol, itemCol, and ratingCol. Set nonnegative to True to avoid predictions that are negative integers (can't have a negative number of playing hours)

from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5, regParam=0.01, userCol="memberID", itemCol="gameID", ratingCol='hours', seed=100)

# Use the fit() method to train the model on the training dataset 
model = als.fit(training)

2025/04/06 09:05:45 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'fe8bb598127d4753b2ecf52cc00c77ee', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


🏃 View run whimsical-swan-552 at: https://community.cloud.databricks.com/ml/experiments/2194890496938728/runs/fe8bb598127d4753b2ecf52cc00c77ee
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/2194890496938728


Running the code above led to the run being logged as an experiment in MLflow

## Model Evaluation

In this section, I evaluate the ALS model trained in the section above by using it to generate predictions on the test dataset. I then evaluate the accuracy of these predictions, as compared to the actual values in the test dataset, using the Root Mean Square Error (RMSE) metric. 

In [0]:
# Use trained model to make predictions on the test dataset. The model is unable to make predictions for members who did not have any reviews in the training dataset (the cold start problem). For these users the predictions will be NaN, so I use dropna() to remove these rows from the predictions DataFrame

predictions = model.transform(test).dropna()

predictions.show()

+--------+--------------------+-----+------+----------+
|memberID|                game|hours|gameID|prediction|
+--------+--------------------+-----+------+----------+
|    5250|              Dota 2|  0.2|     0|  266.0414|
|   76767|Call of Duty Blac...| 12.5|    57| 1414.4698|
|   76767|Call of Duty Mode...|  9.7|    62| 1958.2737|
|   76767|            Portal 2| 15.0|    15| 16.805851|
|   76767|    Worms Armageddon|  0.4|   634|10.4597645|
|  298950|ARK Survival Evolved| 41.0|   105| 452.80072|
|  298950|         Alien Swarm|  1.6|    32|  6.061899|
|  298950|      Alpha Protocol|  0.7|   816| 32.746536|
|  298950|Amnesia The Dark ...|  7.0|    97| 0.7135702|
|  298950|            Banished|  0.6|   214|-31.345238|
|  298950|Batman Arkham Ori...| 15.2|   146| 62.766155|
|  298950|     Castle Crashers|  2.9|   102| 5.6525035|
|  298950|Counter-Strike So...|  0.5|     5|  571.3417|
|  298950|   Crusader Kings II|  1.0|   215| 1393.0398|
|  298950|       Darksiders II| 22.0|   225|  65

In the 'predictions' column above, I can see negative integers, which doesn't make sense as it's not possible to have a negative play durnation. 

Next I will evaluate how accurate these predictions are. I will use Root Mean Square Error as the evaluation metric. This is a metric which is commonly used in regression and measures the average difference between the values predicted by a model and the actual values in the data.

In [0]:
# instantiate an evaluator which will use an evaluation metric (root mean square error) to  measure how effective the model is. 

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="hours", predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

print("Root Mean Square Error is %g" %rmse)

Root Mean Square Error is 561.028


The code above gives an RMSE value of 561. This is the average difference between the values the model predicited for the number of hours played and the true values for these predictions. This difference is quite high. I will attempt to reduce it by improving the accuracy of my model.

## Hyperparameter Tuning and Other Accuracy Improvements 
In this section, I will make several adjustments to improve the accuracy of my ALS model. These inlcude: 

- only use data from players who played 4 or more games to train the model 
- eliminating negative predictions and recommendations from the model (since it's impossible to have a negative game playing duration)
- hyperparameter tuning using a grid search


In [0]:
# Count number of unique members in 'dropped_play_df' dataframe 
member_play_count = dropped_play_df.groupBy("memberID").count()
member_play_count.count()

11350

The code above shows there are 11,350 unique members in the 'dropped_play_df' dataframe (the one containing data on play duration, not purchases)

In [0]:
# From play duration dataset, filter out members who have played fewer than 5 games. If many members have played only a few games, ALS might struggle with recommendations.

filtered_df = dropped_play_df.groupBy("memberID").count().filter("count >= 5")
filtered_df.show()



+---------+-----+
| memberID|count|
+---------+-----+
| 16167221|   25|
|152861732|   12|
|208061820|    5|
|187877855|    6|
|119310413|   27|
|132418423|   10|
| 84471496|   17|
| 95059220|   10|
|100519466|   34|
| 78332414|   27|
| 57433226|   17|
|191747590|    6|
|272181160|    6|
| 55319994|    5|
| 65610147|   10|
|234024191|   13|
| 49724738|    5|
| 19616379|    5|
|170172944|   18|
|199628395|   20|
+---------+-----+
only showing top 20 rows



In [0]:
filtered_df.count()

2436

The code above shows that of the unique members in the 'dropped_play_df' dataframe, 2,436 of them (about 21%) have played 5 or more games. I will use this subset to train the recommender system 

In [0]:
# The filtered_df generated above only has 2 columns (memberID and count). I need to transform it back to the original 4-column layout (memberID, game, hours, and game ID), but keeping only the rows of players who played 4 or more games
filtered_full_df = dropped_play_df.join(filtered_df.select("memberID"), on="memberID", how="inner")

filtered_full_df.show()
filtered_full_df.count()

+---------+--------------------+-----+------+
| memberID|                game|hours|gameID|
+---------+--------------------+-----+------+
|151603712|The Elder Scrolls...|273.0|   8.0|
|151603712|           Fallout 4| 87.0| 100.0|
|151603712|               Spore| 14.9| 332.0|
|151603712|   Fallout New Vegas| 12.1|  29.0|
|151603712|       Left 4 Dead 2|  8.9|   4.0|
|151603712|            HuniePop|  8.5| 867.0|
|151603712|       Path of Exile|  8.1|  39.0|
|151603712|         Poly Bridge|  7.5|1347.0|
|151603712|         Left 4 Dead|  3.3|  49.0|
|151603712|     Team Fortress 2|  2.8|   1.0|
|151603712|         Tomb Raider|  2.5|  55.0|
|151603712|     The Banner Saga|  2.0| 604.0|
|151603712|Dead Island Epidemic|  1.4|  51.0|
|151603712|   BioShock Infinite|  1.3|  42.0|
|151603712|Dragon Age Origin...|  1.3| 301.0|
|151603712|Fallout 3 - Game ...|  0.8| 152.0|
|151603712|SEGA Genesis & Me...|  0.8| 655.0|
|151603712| Grand Theft Auto IV|  0.6|  43.0|
|151603712|Realm of the Mad God|  

57789

In [0]:
# split data in this filtered dataset into training and test dataset 

(training, test) = filtered_full_df.randomSplit([0.8, 0.2], seed=100)

# cache training dataset to speed up model training and hyperparameter tuning 
training.cache()

DataFrame[memberID: int, game: string, hours: double, gameID: double]

In [0]:
# Create another ALS model, train it on the training dataset containing data from players who played 4 or more games and eliminate negative predictions by including nonnegative=True

better_als = ALS(maxIter=5, regParam=0.01, userCol="memberID", itemCol="gameID", ratingCol='hours', nonnegative=True, seed=100)

better_model = better_als.fit(training)

# Use this new ALS model to generate predictions from the test dataset 

better_predictions = better_model.transform(test).dropna()


2025/04/06 10:00:18 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '28bb997218f24caea85e298095b533bc', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


🏃 View run melodic-ox-333 at: https://community.cloud.databricks.com/ml/experiments/2194890496938728/runs/28bb997218f24caea85e298095b533bc
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/2194890496938728


In [0]:
# measure how effective this model is by calculating RMSE

rmse = evaluator.evaluate(better_predictions)

print("Root Mean Square Error is %g" %rmse)

Root Mean Square Error is 403.488


Training the model on data from players who played 4 or more games and removing negative prediction values has improved its accuracy: the RMSE has been reduced from 561 to 403.

### Normalisation

In [0]:
# I will normalise the 'hours' column before re-running the ALS algorithm.  This is because in the original dataset, there is very high variance in this column (0.1 hours - 11,754 hours). The first ALS above might have overfit to users who have extremely high playtimes. Log transformation smooths out differences, improving generalisation and potentially reducing RMSE.

# from pyspark.sql.functions import log1p
# norm_play_df = filtered_full_df.withColumn("normalised_hours", log1p("hours"))

# split data in this normalised dataset into training and test dataset 

# (training, test) = norm_play_df.randomSplit([0.8, 0.2], seed=100)

# cache training dataset to speed up model training and hyperparameter tuning 
# training.cache()


I will now re-run the ALS model to see if these changes have made it more accurate. In addition to the above improvements, I will set ```nonnegative=True``` as it's not possible to have negative play durations, so negative numbers should not be generated in the predictions

In [0]:
# Create another ALS model using the improvements listed above and train it on the training dataset containing the normalised hours values 

# better_als = ALS(maxIter=5, regParam=0.01, userCol="memberID", itemCol="gameID", ratingCol='normalised_hours', nonnegative=True, seed=100)

# better_model = better_als.fit(training)

# Use this new ALS model to generate predictions from the test dataset 

# better_predictions = better_model.transform(test).dropna()





**1 MLflow RUN LOGGED - DISCUSS **

In [0]:
# better_predictions.show()

In [0]:
# from pyspark.sql.functions import expm1

# # Apply inverse transformation before evaluating RMSE
# predictions = better_predictions.withColumn("prediction_original", expm1("prediction"))

# # Use prediction_original instead of raw prediction for RMSE calculation
# rmse_evaluator = RegressionEvaluator(labelCol="hours", predictionCol="prediction_original", metricName="rmse")
# rmse = rmse_evaluator.evaluate(predictions)
# print(f"RMSE: {rmse}")


### Hyperparameter tuning
In this section, I will test different hyperparameter values to find the combination of hyperparameters that will make my model more accurate.  

First, I will import and instantiate the ParamGridBuilder, and then add the different values I want to try for the parameters to the grid. In this case, I am going to try:
- values of **XX, YY AND ZZ** for regParam (previously it was 0.01)
- values of **XX, YY AND ZZ** for rank (previously it was 10)
- values of **XX, YY AND ZZ** for maxIter (previously it was 5) 

In [0]:
# use 50% sample of training data to speed up gridsearch time 
# sample_training = training.sample(fraction=0.5, seed=42) 

# cache sample training dataset to speed up model training and hyperparameter tuning 
# sample_training.cache()


In [0]:
# Create a parameter grid to use for hyperparameter tuning 

from pyspark.ml.tuning import ParamGridBuilder

parameters = ParamGridBuilder().addGrid(als.regParam, [0.001, 0.01, 0.1, 0.2]).addGrid(als.rank, [5, 10, 20]).addGrid(als.maxIter, [5, 10, 15]).build()

In [0]:
# from pyspark.ml.tuning import CrossValidator

# crossval = CrossValidator(estimator=als, # could this use better_als?
#                           estimatorParamMaps=parameters,
#                           evaluator=evaluator,
#                           numFolds=5)  


In [0]:
# Define trainValidationSplit

from pyspark.ml.tuning import TrainValidationSplit

tvs = TrainValidationSplit()\
    .setSeed(100)\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(parameters)\
    .setEstimator(better_als)\
    .setEvaluator(evaluator)

In [0]:
# Train model using grid search

gridsearchModel = tvs.fit(training)

2025/04/06 10:21:35 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '8367a9164ee34674b5cd6d303a6e1655', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


🏃 View run grandiose-koi-883 at: https://community.cloud.databricks.com/ml/experiments/2194890496938728/runs/8367a9164ee34674b5cd6d303a6e1655
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/2194890496938728




**MLflow LOGGED 16 RUNS - DISCUSS**

In [0]:
# Print the optimal hyperparameters as determined by the grid search 

bestModel = gridsearchModel.bestModel

print("Parameters for the best model:")
print("Rank Parameter: %g" %bestModel.rank)
print("RegParam Parameter: %g" %bestModel._java_obj.parent().getRegParam())
print("maxIter Parameter: %g" %bestModel._java_obj.parent().getMaxIter())


Parameters for the best model:
Rank Parameter: 10
RegParam Parameter: 0.01
maxIter Parameter: 5


[This run](https://community.cloud.databricks.com/ml/experiments/2194890496938728/runs/01f15a8ca93046e48b642a13f51ee8af?o=443721649673591) was XYZZ...


In [0]:
# Create another ALS model using the optimal hyperparameters as determined by the grid search and the other improvements listed above 

best_als = ALS(maxIter=5, regParam=0.01, rank=10, userCol="memberID", itemCol="gameID", ratingCol='hours', nonnegative=True, seed=100)

best_model = best_als.fit(training)

# Use this new ALS model to generate predictions from the test dataset 

best_predictions = best_model.transform(test).dropna()

2025/04/06 10:20:50 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '5e0d108523664685aea36a354e596383', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


🏃 View run handsome-swan-27 at: https://community.cloud.databricks.com/ml/experiments/2194890496938728/runs/5e0d108523664685aea36a354e596383
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/2194890496938728


**1 MLflow run logged - discuss**

In [0]:
# Calculate root mean square error value for this best model 

rmse = evaluator.evaluate(best_predictions)

print("Root Mean Square Error is %g" %rmse)

Root Mean Square Error is 403.488


## Generating Recommendations

In [0]:
# use the best model (according to the hyperparameter tuning grid search) to generate predictions  
loaded_predictions = loaded_model.transform(test)
loaded_predictions.show()

To generate predictions, I will create a new DataFrame which contains all the gameIDs and a column populated with a new memberID of ‘0’. I will then use the transform method on this data, which will generate a prediction for the member with memberID 0 for every game in the dataset. I can then use orderBy to sort on the games with the highest values for predicted play durationn – these are the games which member0 is predicted to play the most.

In [0]:
# Create new dataframe will all gameIDs and names, and '0' in the memberID column 

unique_games_df = dropped_play_df.select("game", "gameID").dropDuplicates()
unique_games_df.show()

+--------------------+------+
|                game|gameID|
+--------------------+------+
|            HuniePop| 867.0|
|               Spore| 332.0|
|  Marvel Heroes 2015|  73.0|
|The Elder Scrolls...|   8.0|
|             Eldevin| 881.0|
|Fallout 3 - Game ...| 152.0|
|SEGA Genesis & Me...| 655.0|
|           Fallout 4| 100.0|
| Grand Theft Auto IV|  43.0|
|         Left 4 Dead|  49.0|
|         Tomb Raider|  55.0|
|       Left 4 Dead 2|   4.0|
|         Poly Bridge|1347.0|
|   BioShock Infinite|  42.0|
|Dragon Age Origin...| 301.0|
|     Team Fortress 2|   1.0|
|       Path of Exile|  39.0|
|   Fallout New Vegas|  29.0|
|Realm of the Mad God| 134.0|
|     The Banner Saga| 604.0|
+--------------------+------+
only showing top 20 rows



In [0]:
myGeneratedPredictions = unique_games_df.withColumn("memberID", functions.expr("int('0')"))
myGeneratedPredictions.show()

+--------------------+------+--------+
|                game|gameID|memberID|
+--------------------+------+--------+
|            HuniePop| 867.0|       0|
|               Spore| 332.0|       0|
|  Marvel Heroes 2015|  73.0|       0|
|The Elder Scrolls...|   8.0|       0|
|             Eldevin| 881.0|       0|
|Fallout 3 - Game ...| 152.0|       0|
|SEGA Genesis & Me...| 655.0|       0|
|           Fallout 4| 100.0|       0|
| Grand Theft Auto IV|  43.0|       0|
|         Left 4 Dead|  49.0|       0|
|         Tomb Raider|  55.0|       0|
|       Left 4 Dead 2|   4.0|       0|
|         Poly Bridge|1347.0|       0|
|   BioShock Infinite|  42.0|       0|
|Dragon Age Origin...| 301.0|       0|
|     Team Fortress 2|   1.0|       0|
|       Path of Exile|  39.0|       0|
|   Fallout New Vegas|  29.0|       0|
|Realm of the Mad God| 134.0|       0|
|     The Banner Saga| 604.0|       0|
+--------------------+------+--------+
only showing top 20 rows



In [0]:
# use the best model to generate recommendations

myGeneratedPredictions = better_model.transform(myGeneratedPredictions)

myGeneratedPredictions = myGeneratedPredictions.dropna()

myGeneratedPredictions.orderBy("prediction",ascending=False).show(10)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-4608006875796>, line 3[0m
[1;32m      1[0m [38;5;66;03m# use the best model to generate recommendations[39;00m
[0;32m----> 3[0m myGeneratedPredictions [38;5;241m=[39m better_model[38;5;241m.[39mtransform(myGeneratedPredictions)
[1;32m      5[0m myGeneratedPredictions[38;5;241m.[39morderBy([38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m,ascending[38;5;241m=[39m[38;5;28;01mFalse[39;00m)[38;5;241m.[39mshow([38;5;241m10[39m)

File [0;32m/databricks/python/lib/python3.11/site-packages/mlflow/utils/autologging_utils/safety.py:592[0m, in [0;36msafe_patch.<locals>.safe_patch_function[0;34m(*args, **kwargs)[0m
[1;32m    590[0m     patch_function[38;5;241m.[39mcall(call_original, [38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m    591[

Use a markdown cell at the end of the question to briefly explain the output / relate this back to the question and to comment or explain anything significant about this finding. Follow this structure (or a similar structure) for all questions / sections of your submission.

If you want to add additional markdown cells to provide further explanation throughout your notebook, add these as required.

REFLECTION: COULD TRY USING PURCHASE DATA AS WELL AS PLAY DATA TO REFINE RECOMMENDATIONS? 

## Conclusion

Finish with a brief conclusion to wrap up your submission and any key findings.