#TASK 2
#RECOMMENDER SYSTEM

## Exploratory Data Analysis

In [0]:
#checking to ensure the steam file was uploaded
dbutils.fs.ls('/FileStore/tables/')

[FileInfo(path='dbfs:/FileStore/tables/.Test-unix/', name='.Test-unix/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/.XIM-unix/', name='.XIM-unix/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/.font-unix/', name='.font-unix/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/Occupancy_Detection_Data.csv', name='Occupancy_Detection_Data.csv', size=50968, modificationTime=1709138386000),
 FileInfo(path='dbfs:/FileStore/tables/SQLcount_month.csv/', name='SQLcount_month.csv/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/account-models/', name='account-models/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts/', name='accounts/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts.zip', name='accounts.zip', size=5297592, modificationTime=1709709833000),
 FileInfo(path='dbfs:/FileStore/tables/activations/', name='activations/', size=0, modificationT

In [0]:
# importing mlflow and autolog 
import mlflow
mlflow.pyspark.ml.autolog() 

In [0]:
#loading data into dataframe 
steamdf = spark.read.csv(
    path='/FileStore/tables/steam_200k.csv',
    header=False, 
    inferSchema=True
)
#display the first five rows 
steamdf.show(5)

+---------+--------------------+--------+-----+
|      _c0|                 _c1|     _c2|  _c3|
+---------+--------------------+--------+-----+
|151603712|The Elder Scrolls...|purchase|  1.0|
|151603712|The Elder Scrolls...|    play|273.0|
|151603712|           Fallout 4|purchase|  1.0|
|151603712|           Fallout 4|    play| 87.0|
|151603712|               Spore|purchase|  1.0|
+---------+--------------------+--------+-----+
only showing top 5 rows



In [0]:
#Applying toDF() method to rename the columns
steamdf1 = steamdf.toDF('user_id', 'game_name', 'action', 'playtime')

#show first five rows 
steamdf1.show(5)

+---------+--------------------+--------+--------+
|  user_id|           game_name|  action|playtime|
+---------+--------------------+--------+--------+
|151603712|The Elder Scrolls...|purchase|     1.0|
|151603712|The Elder Scrolls...|    play|   273.0|
|151603712|           Fallout 4|purchase|     1.0|
|151603712|           Fallout 4|    play|    87.0|
|151603712|               Spore|purchase|     1.0|
+---------+--------------------+--------+--------+
only showing top 5 rows



In [0]:
#Importing monotonically_increases_id to generate unique id for each row
from pyspark.sql.functions import monotonically_increasing_id

#Creating another column and applying the newly imported function to generate unique id
gameid_df = steamdf1.withColumn('game_id', monotonically_increasing_id() + 1)

#Showing the first five rows
gameid_df.show(5)

+---------+--------------------+--------+--------+-------+
|  user_id|           game_name|  action|playtime|game_id|
+---------+--------------------+--------+--------+-------+
|151603712|The Elder Scrolls...|purchase|     1.0|      1|
|151603712|The Elder Scrolls...|    play|   273.0|      2|
|151603712|           Fallout 4|purchase|     1.0|      3|
|151603712|           Fallout 4|    play|    87.0|      4|
|151603712|               Spore|purchase|     1.0|      5|
+---------+--------------------+--------+--------+-------+
only showing top 5 rows



In [0]:
#Importing col from Pyspark
from pyspark.sql.functions import cast, col

#converting game_id column from long to integer
steam_gameid2 = gameid_df.withColumn("game_id", col("game_id").cast("Integer"))

#show the first five rows 
steam_gameid2.show(5)


+---------+--------------------+--------+--------+-------+
|  user_id|           game_name|  action|playtime|game_id|
+---------+--------------------+--------+--------+-------+
|151603712|The Elder Scrolls...|purchase|     1.0|      1|
|151603712|The Elder Scrolls...|    play|   273.0|      2|
|151603712|           Fallout 4|purchase|     1.0|      3|
|151603712|           Fallout 4|    play|    87.0|      4|
|151603712|               Spore|purchase|     1.0|      5|
+---------+--------------------+--------+--------+-------+
only showing top 5 rows



In [0]:
steam_gameid2.createOrReplaceTempView('EDAview') 

In [0]:
%sql
SELECT game_name, action, SUM(playtime) AS TotalHoursPlayed
FROM EDAview 
WHERE action = 'play'
GROUP BY game_name, action
ORDER BY TotalHoursPlayed DESC
LIMIT 10

game_name,action,TotalHoursPlayed
Dota 2,play,981684.6
Counter-Strike Global Offensive,play,322771.6000000001
Team Fortress 2,play,173673.30000000005
Counter-Strike,play,134261.09999999998
Sid Meier's Civilization V,play,99821.30000000002
Counter-Strike Source,play,96075.50000000004
The Elder Scrolls V Skyrim,play,70889.3
Garry's Mod,play,49725.3
Call of Duty Modern Warfare 2 - Multiplayer,play,42009.9
Left 4 Dead 2,play,33596.700000000004


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT game_name, action, count(*) AS TotalPurchase 
FROM EDAview
WHERE action = 'purchase' 
GROUP BY game_name, action 
ORDER BY TotalPurchase DESC
LIMIT(10) 

game_name,action,TotalPurchase
Dota 2,purchase,4841
Team Fortress 2,purchase,2323
Unturned,purchase,1563
Counter-Strike Global Offensive,purchase,1412
Half-Life 2 Lost Coast,purchase,981
Counter-Strike Source,purchase,978
Left 4 Dead 2,purchase,951
Counter-Strike,purchase,856
Warframe,purchase,847
Half-Life 2 Deathmatch,purchase,823


Databricks visualization. Run in Databricks to view.

##Data Pre-processing 

In [0]:
#Converting data into correct format for MLlib learning algorithm.
from pyspark.ml.feature import RFormula

#drop features column if it exist so that it can run multiple times 
if 'features' in steam_gameid2.columns:
    steam_gameid2 = steam_gameid2.drop('features')
    

steam_preprocess = RFormula(formula='playtime ~ .') 
steam_gameid2 = steam_preprocess.fit(steam_gameid2).transform(steam_gameid2)
steam_gameid2.show(5)

2024/05/02 00:26:34 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'db25106296f74d489f73d2e1c82ba2be', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


+---------+--------------------+--------+--------+-------+--------------------+-----+
|  user_id|           game_name|  action|playtime|game_id|            features|label|
+---------+--------------------+--------+--------+-------+--------------------+-----+
|151603712|The Elder Scrolls...|purchase|     1.0|      1|(5157,[0,9,5155,5...|  1.0|
|151603712|The Elder Scrolls...|    play|   273.0|      2|(5157,[0,9,5156],...|273.0|
|151603712|           Fallout 4|purchase|     1.0|      3|(5157,[0,101,5155...|  1.0|
|151603712|           Fallout 4|    play|    87.0|      4|(5157,[0,101,5156...| 87.0|
|151603712|               Spore|purchase|     1.0|      5|(5157,[0,333,5155...|  1.0|
+---------+--------------------+--------+--------+-------+--------------------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import col 
#filter play from steam_gameid2 
play_df = steam_gameid2.filter(col('action')=='play')

#filter purchase from steam_gameid2
purchase_df = steam_gameid2.filter(col('action')=='purchase')

#display purchase_df
purchase_df.show(3)

#display play_df
play_df.show(3)


+---------+--------------------+--------+--------+-------+--------------------+-----+
|  user_id|           game_name|  action|playtime|game_id|            features|label|
+---------+--------------------+--------+--------+-------+--------------------+-----+
|151603712|The Elder Scrolls...|purchase|     1.0|      1|(5157,[0,9,5155,5...|  1.0|
|151603712|           Fallout 4|purchase|     1.0|      3|(5157,[0,101,5155...|  1.0|
|151603712|               Spore|purchase|     1.0|      5|(5157,[0,333,5155...|  1.0|
+---------+--------------------+--------+--------+-------+--------------------+-----+
only showing top 3 rows

+---------+--------------------+------+--------+-------+--------------------+-----+
|  user_id|           game_name|action|playtime|game_id|            features|label|
+---------+--------------------+------+--------+-------+--------------------+-----+
|151603712|The Elder Scrolls...|  play|   273.0|      2|(5157,[0,9,5156],...|273.0|
|151603712|           Fallout 4|  pla

##Train / Test data

In [0]:
#splitting the data into training and test datasets for steam_gameid2
(trainingDF, testDF) = steam_gameid2.randomSplit([0.7, 0.3], seed =200 )

#splitting the data into training and test datasets for play_df
(trainingplay, testplay) = play_df.randomSplit([0.7, 0.3], seed = 200)

#splitting the data into training and test datasets for purchase_df
(trainingpurchase, testpurchase) = purchase_df.randomSplit([0.7, 0.3], seed = 200)

##4. Training the model Using Alternate Least Squares (ALS) 


In [0]:
from pyspark.ml.recommendation import ALS 

#using alternating least squares by instantiating an instance of the estimator steam_gameid2
als = ALS(maxIter=5, regParam=0.01, userCol='user_id', itemCol='game_id', ratingCol='playtime', seed=200)

#train the model for steam_gameid2
model = als.fit(trainingDF)

#train the model for play_df
play_model = als.fit(trainingplay)

#train the model for purchase_df
purchase_model = als.fit(trainingpurchase)


2024/05/02 00:26:47 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '1cfc4a099f8149f69f57dd7c829c73c9', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2024/05/02 00:29:29 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '3dedeaedc49847b3a52d572cf81a3d1c', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2024/05/02 00:31:37 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'a260198eb1624ff6b0547d719fa7cb96', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


##Model Evaluation 

In [0]:
#predicting model for steam_gameid2 
predictions = model.transform(testDF).dropna() 

#predicting model for play_df
play_pedictions = play_model.transform(testplay).dropna() 

#predicting model for purchase_df
purchase_predictions = purchase_model.transform(testpurchase).dropna() 

#show steam_gameid2 
predictions.show(3)

#show play_df
play_pedictions.show(3) 

#show purhcase_df 
purchase_predictions.show(3)

+-------+--------------------+--------+--------+-------+--------------------+-----+-----------+
|user_id|           game_name|  action|playtime|game_id|            features|label| prediction|
+-------+--------------------+--------+--------+-------+--------------------+-----+-----------+
|   5250|         Alien Swarm|purchase|     1.0|  65430|(5157,[0,33,5155,...|  1.0|-0.34882042|
|   5250|      Counter-Strike|purchase|     1.0|  65436|(5157,[0,7,5155,5...|  1.0|-0.21801274|
|   5250|Deus Ex Human Rev...|purchase|     1.0|  65426|(5157,[0,119,5155...|  1.0|-0.43602547|
+-------+--------------------+--------+--------+-------+--------------------+-----+-----------+
only showing top 3 rows

+-------+--------------------+------+--------+-------+--------------------+-----+-----------+
|user_id|           game_name|action|playtime|game_id|            features|label| prediction|
+-------+--------------------+------+--------+-------+--------------------+-----+-----------+
| 181212|Half-Life 2 

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator 
#assigning a variable to regression evaluator 
evaluator = RegressionEvaluator(metricName='rmse', labelCol='playtime', predictionCol='prediction') 

#evaluate steam_gameid2 model
rmse = evaluator.evaluate(predictions)

#evaluate play_df model
rmse_play = evaluator.evaluate(play_pedictions)

#evaluate purchase_df model
rmse_purchase = evaluator.evaluate(purchase_predictions)

print('Root Mean Square Error for model is %g' %rmse)
print('Root Mean Square Error for play_model is %g' %rmse_play)
print('Root Mean Square Error for purchase_model is %g' %rmse_purchase)


Root Mean Square Error for model is 145.835
Root Mean Square Error for play_model is 180.16
Root Mean Square Error for purchase_model is 0.859162


##Training the model using different hyperparameters

In [0]:
from pyspark.ml.recommendation import ALS 

#using alternating least squares by instantiating an instance of the estimator steam_gameid2
als = ALS(maxIter=10, regParam=0.05, userCol='user_id', itemCol='game_id', ratingCol='playtime', seed=100)

#train the model for steam_gameid2
model_1= als.fit(trainingDF)

#train the model for play_df
play_model_1 = als.fit(trainingplay)

#train the model for purchase_df
purchase_model_1 = als.fit(trainingpurchase)

2024/05/02 00:36:23 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'bb753c9d1eae4cde829467f2a9c5931b', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2024/05/02 00:39:49 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'bd083f4c55484f879828dc793fcf73a8', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2024/05/02 00:43:08 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '9ea1a3ade9654d20aacda8d692e92999', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


##Model Evaluation

In [0]:
#predicting model for steam_gameid2 
predictions_1 = model_1.transform(testDF).dropna() 

#predicting model for play_df
play_pedictions1 = play_model_1.transform(testplay).dropna() 

#predicting model for purchase_df
purchase_predictions1 = purchase_model_1.transform(testpurchase).dropna() 

#show steam_gameid2 
predictions_1.show(3)

#show play_df
play_pedictions1.show(3) 

#show purhcase_df 
purchase_predictions1.show(3)

+-------+--------------------+--------+--------+-------+--------------------+-----+------------+
|user_id|           game_name|  action|playtime|game_id|            features|label|  prediction|
+-------+--------------------+--------+--------+-------+--------------------+-----+------------+
|   5250|         Alien Swarm|purchase|     1.0|  65430|(5157,[0,33,5155,...|  1.0|0.0064391047|
|   5250|      Counter-Strike|purchase|     1.0|  65436|(5157,[0,7,5155,5...|  1.0|0.0040244237|
|   5250|Deus Ex Human Rev...|purchase|     1.0|  65426|(5157,[0,119,5155...|  1.0| 0.008048847|
+-------+--------------------+--------+--------+-------+--------------------+-----+------------+
only showing top 3 rows

+-------+--------------------+------+--------+-------+--------------------+-----+-----------+
|user_id|           game_name|action|playtime|game_id|            features|label| prediction|
+-------+--------------------+------+--------+-------+--------------------+-----+-----------+
| 181212|Half-

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator 

#applying regression evaluator to measure the accuracy of the model
evaluator2 = RegressionEvaluator(metricName='rmse', labelCol='playtime', predictionCol='prediction') 

#evaluate steam_gameid2 model
rmse_1 = evaluator2.evaluate(predictions_1)

#evaluate play_df model
rmse_play1 = evaluator2.evaluate(play_pedictions1)

#evaluate purchase_df model
rmse_purchase1 = evaluator2.evaluate(purchase_predictions1)

print('Root Mean Square Error for model_1 is %g' %rmse_1)
print('Root Mean Square Error for play_model_1 is %g' %rmse_play1)
print('Root Mean Square Error for purchase_model_1 is %g' %rmse_purchase1)


Root Mean Square Error for model_1 is 145.955
Root Mean Square Error for play_model_1 is 178.811
Root Mean Square Error for purchase_model_1 is 0.396602


##Generating game recommendation

In [0]:
#generate a recomender system for all users
userRecs = model_1.recommendForAllUsers(20)

userRecs.show(5, truncate=False)

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                                                                                                                                                                                                                                                |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import explode

#mapping game_id to game_name
game_name = steam_gameid2.select('game_id', 'game_name').distinct()

#generating game recommendation for a specific users
# Directly filter user_id 76767, explode the recommendations, and join the results with game_name
user_recommendations = (userRecs
    .filter(col('user_id') == 76767) 
    .withColumn('recommendations', explode('recommendations')) 
    .select(
        col('recommendations.game_id').alias('game_id'),  
        col('recommendations.rating').alias('rating')
    )
    .join(game_name, on='game_id')   
    .show(5,truncate=False))  


+-------+---------+-------------------------------------+
|game_id|rating   |game_name                            |
+-------+---------+-------------------------------------+
|8307   |4376.8213|Dota 2                               |
|8307   |4376.8213|Sid Meier's Civilization III Complete|
|78082  |2921.2432|FINAL FANTASY XIV A Realm Reborn     |
|78082  |2921.2432|StarDrive                            |
|32486  |2591.4653|Dota 2                               |
+-------+---------+-------------------------------------+
only showing top 5 rows

