In [0]:
#import mlflow and autolog machine learning runs

import mlflow

mlflow.pyspark.ml.autolog()

In [0]:
#read data into spark DataFrame

steamDF = spark.read.csv('/FileStore/tables/steam_200k.csv',
                         header = 'false',
                         inferSchema = 'true').withColumnRenamed('_c0', 'userId').withColumnRenamed('_c1', 'Games').withColumnRenamed('_c2', 's2').withColumnRenamed('_c3', 's3')

#Exploratory analysis

In [0]:
steamDF = steamDF.dropna()
steamDF.show()

+---------+--------------------+--------+-----+
|   userId|               Games|      s2|   s3|
+---------+--------------------+--------+-----+
|151603712|The Elder Scrolls...|purchase|  1.0|
|151603712|The Elder Scrolls...|    play|273.0|
|151603712|           Fallout 4|purchase|  1.0|
|151603712|           Fallout 4|    play| 87.0|
|151603712|               Spore|purchase|  1.0|
|151603712|               Spore|    play| 14.9|
|151603712|   Fallout New Vegas|purchase|  1.0|
|151603712|   Fallout New Vegas|    play| 12.1|
|151603712|       Left 4 Dead 2|purchase|  1.0|
|151603712|       Left 4 Dead 2|    play|  8.9|
|151603712|            HuniePop|purchase|  1.0|
|151603712|            HuniePop|    play|  8.5|
|151603712|       Path of Exile|purchase|  1.0|
|151603712|       Path of Exile|    play|  8.1|
|151603712|         Poly Bridge|purchase|  1.0|
|151603712|         Poly Bridge|    play|  7.5|
|151603712|         Left 4 Dead|purchase|  1.0|
|151603712|         Left 4 Dead|    play

In [0]:
steamDF.createOrReplaceTempView('steam')

In [0]:
%sql
--most played game
select * from steam
where s3 = (select max(s3) from steam)


userId,Games,s2,s3
73017395,Sid Meier's Civilization V,play,11754.0


In [0]:
%sql
select games, count(*) as purchase_count
from steam
group by games
order by purchase_count desc
limit 10

games,purchase_count
Dota 2,9682
Team Fortress 2,4646
Counter-Strike Global Offensive,2789
Unturned,2632
Left 4 Dead 2,1752
Counter-Strike Source,1693
Counter-Strike,1424
Garry's Mod,1397
The Elder Scrolls V Skyrim,1394
Warframe,1271


Databricks visualization. Run in Databricks to view.

In [0]:
steamDF.groupBy('userId').count().display()

userId,count
16167221,57
166705920,11
244878837,2
99992274,2
174415183,10
156156544,2
152861732,34
171911285,2
128412180,2
74557142,4


In [0]:
AssassinsCreedDF = steamDF.filter(steamDF['Games'].like ('%Creed'))
AssassinsCreedDF.show(truncate=False)

+---------+----------------+--------+----+
|userId   |Games           |s2      |s3  |
+---------+----------------+--------+----+
|20200395 |Assassin's Creed|purchase|1.0 |
|20200395 |Assassin's Creed|play    |0.2 |
|135879753|Assassin's Creed|purchase|1.0 |
|135879753|Assassin's Creed|play    |2.0 |
|64787956 |Assassin's Creed|purchase|1.0 |
|78341587 |Assassin's Creed|purchase|1.0 |
|78341587 |Assassin's Creed|play    |1.2 |
|23492094 |Assassin's Creed|purchase|1.0 |
|92393218 |Assassin's Creed|purchase|1.0 |
|92393218 |Assassin's Creed|play    |47.0|
|101690993|Assassin's Creed|purchase|1.0 |
|101690993|Assassin's Creed|play    |21.0|
|53898495 |Assassin's Creed|purchase|1.0 |
|53898495 |Assassin's Creed|play    |4.1 |
|82046939 |Assassin's Creed|purchase|1.0 |
|82046939 |Assassin's Creed|play    |4.0 |
|240405101|Assassin's Creed|purchase|1.0 |
|240405101|Assassin's Creed|play    |4.5 |
|9128105  |Assassin's Creed|purchase|1.0 |
|9128105  |Assassin's Creed|play    |3.5 |
+---------+

In [0]:
AssassinsCreedDF.count()

87

#Generating ID for the games

In [0]:
from pyspark.ml.feature import StringIndexer

# Instantiate the StringIndexer
game_id = StringIndexer(inputCol="Games", outputCol="gamesId")

# Fit and transform the DataFrame
indexed_df = game_id.fit(steamDF).transform(steamDF)

# Show the result
indexed_df.show(4)

2024/05/01 20:59:17 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '561a69c0b6af47ca88f00a676e8ad59c', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


+---------+--------------------+--------+-----+-------+
|   userId|               Games|      s2|   s3|gamesId|
+---------+--------------------+--------+-----+-------+
|151603712|The Elder Scrolls...|purchase|  1.0|    8.0|
|151603712|The Elder Scrolls...|    play|273.0|    8.0|
|151603712|           Fallout 4|purchase|  1.0|  100.0|
|151603712|           Fallout 4|    play| 87.0|  100.0|
+---------+--------------------+--------+-----+-------+
only showing top 4 rows



#Training model with purchase

In [0]:
purchasedf = indexed_df.filter(indexed_df['s2'] == 'purchase')
purchasedf = purchasedf.drop('s2')
purchasedf.show()

+---------+--------------------+---+-------+
|   userId|               Games| s3|gamesId|
+---------+--------------------+---+-------+
|151603712|The Elder Scrolls...|1.0|    8.0|
|151603712|           Fallout 4|1.0|  100.0|
|151603712|               Spore|1.0|  332.0|
|151603712|   Fallout New Vegas|1.0|   29.0|
|151603712|       Left 4 Dead 2|1.0|    4.0|
|151603712|            HuniePop|1.0|  867.0|
|151603712|       Path of Exile|1.0|   39.0|
|151603712|         Poly Bridge|1.0| 1347.0|
|151603712|         Left 4 Dead|1.0|   49.0|
|151603712|     Team Fortress 2|1.0|    1.0|
|151603712|         Tomb Raider|1.0|   55.0|
|151603712|     The Banner Saga|1.0|  604.0|
|151603712|Dead Island Epidemic|1.0|   51.0|
|151603712|   BioShock Infinite|1.0|   42.0|
|151603712|Dragon Age Origin...|1.0|  301.0|
|151603712|Fallout 3 - Game ...|1.0|  152.0|
|151603712|SEGA Genesis & Me...|1.0|  655.0|
|151603712| Grand Theft Auto IV|1.0|   43.0|
|151603712|Realm of the Mad God|1.0|  134.0|
|151603712

In [0]:
(training, test) = purchasedf.randomSplit([0.7, 0.3], seed=100)

In [0]:
from pyspark.ml.recommendation import ALS

als = ALS(rank=5, maxIter=5, regParam=0.01, userCol = 'userId', itemCol = 'gamesId', ratingCol = 's3',  implicitPrefs = True, seed=100)

model = als.fit(training)

2024/05/01 20:59:27 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '3ebdf46f2898472592fb6a7a3bffc644', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


#obtaining predictions

In [0]:
predictionsdf = model.transform(test).dropna()

predictionsdf.show()

+------+--------------------+---+-------+-----------+
|userId|               Games| s3|gamesId| prediction|
+------+--------------------+---+-------+-----------+
|  5250|Counter-Strike So...|1.0|      5|  0.6703625|
|  5250|Half-Life 2 Death...|1.0|     13| 0.67615545|
|  5250|Half-Life 2 Episo...|1.0|     37| 0.34870023|
|  5250|Half-Life Blue Shift|1.0|     74|  0.4842425|
|  5250|Half-Life Opposin...|1.0|     75|  0.4768132|
|  5250|     Team Fortress 2|1.0|      1|  0.6168351|
|  5250|Team Fortress Cla...|1.0|     79| 0.46295378|
| 76767|Arma 2 Operation ...|1.0|    129| 0.08278456|
| 76767|Call of Duty Mode...|1.0|     25| 0.10204532|
| 76767|      Counter-Strike|1.0|      6| 0.42709893|
| 76767|Counter-Strike Gl...|1.0|      2| 0.22623502|
| 76767|       Day of Defeat|1.0|     28| 0.33738065|
| 76767|  Deathmatch Classic|1.0|     34| 0.32981768|
| 76767|Half-Life Blue Shift|1.0|     74| 0.29930696|
| 76767|Half-Life Opposin...|1.0|     75| 0.29020995|
| 76767|       Thief - Ghost

#Evaluating the model

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', labelCol='s3', predictionCol='prediction')

rmse = evaluator.evaluate(predictionsdf)

print('Root Mean Square Error is %g' %rmse)

Root Mean Square Error is 0.853104


#Generating recommendations

In [0]:
myrecomm = purchasedf

myrecomm = model.transform(myrecomm)

myrecomm = myrecomm.dropna()

myrecomm.orderBy('prediction', ascending=False).show(truncate=False)

+---------+---------------+---+-------+----------+
|userId   |Games          |s3 |gamesId|prediction|
+---------+---------------+---+-------+----------+
|100719181|Team Fortress 2|1.0|1      |1.7439352 |
|205285563|Team Fortress 2|1.0|1      |1.6963995 |
|166863117|Unturned       |1.0|3      |1.5751729 |
|248444377|Unturned       |1.0|3      |1.5291862 |
|197904909|Team Fortress 2|1.0|1      |1.51586   |
|216088282|Unturned       |1.0|3      |1.5124595 |
|168163793|Unturned       |1.0|3      |1.4770571 |
|189734107|Team Fortress 2|1.0|1      |1.4645566 |
|246439849|Unturned       |1.0|3      |1.4502183 |
|133422169|Team Fortress 2|1.0|1      |1.4401431 |
|152959594|Team Fortress 2|1.0|1      |1.4337177 |
|176771895|Unturned       |1.0|3      |1.4262924 |
|157636450|Unturned       |1.0|3      |1.4081296 |
|204795948|Unturned       |1.0|3      |1.402651  |
|73726855 |Unturned       |1.0|3      |1.38949   |
|52567955 |Team Fortress 2|1.0|1      |1.3869606 |
|219466905|Unturned       |1.0|

#Training model with play

In [0]:
playtimedf = indexed_df.filter(indexed_df['s2'] == 'play').withColumnRenamed('s3', 'playtime')
playtimedf = playtimedf.drop('s2')
playtimedf.show(truncate=False)

+---------+-------------------------------------+--------+-------+
|userId   |Games                                |playtime|gamesId|
+---------+-------------------------------------+--------+-------+
|151603712|The Elder Scrolls V Skyrim           |273.0   |8.0    |
|151603712|Fallout 4                            |87.0    |100.0  |
|151603712|Spore                                |14.9    |332.0  |
|151603712|Fallout New Vegas                    |12.1    |29.0   |
|151603712|Left 4 Dead 2                        |8.9     |4.0    |
|151603712|HuniePop                             |8.5     |867.0  |
|151603712|Path of Exile                        |8.1     |39.0   |
|151603712|Poly Bridge                          |7.5     |1347.0 |
|151603712|Left 4 Dead                          |3.3     |49.0   |
|151603712|Team Fortress 2                      |2.8     |1.0    |
|151603712|Tomb Raider                          |2.5     |55.0   |
|151603712|The Banner Saga                      |2.0     |604.

#Normalizing play column

In [0]:
from pyspark.sql.functions import min
from pyspark.sql.functions import max

# Finding the minimum and maximum value in the playtime column
minimum_value = playtimedf.select(min('playtime')).first()[0]
maximum_value = playtimedf.select(max('playtime')).first()[0]

print("Minimum value in the column:", minimum_value)
print("Maximum value in the column:", maximum_value)

Minimum value in the column: 0.1
Maximum value in the column: 11754.0


In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import format_number

min_value = 0.1
max_value = 11754.0

# Normalize 'playtime' column
playtimedf = playtimedf.withColumn('normalized_playtime', (col('playtime') - min_value) / (max_value - min_value))
playtimedf = playtimedf.withColumn('full_normalized_playtime', format_number('normalized_playtime', 16))
playtimedf.show(5, truncate=False)

+---------+--------------------------+--------+-------+---------------------+------------------------+
|userId   |Games                     |playtime|gamesId|normalized_playtime  |full_normalized_playtime|
+---------+--------------------------+--------+-------+---------------------+------------------------+
|151603712|The Elder Scrolls V Skyrim|273.0   |8.0    |0.023217825572788606 |0.0232178255727886      |
|151603712|Fallout 4                 |87.0    |100.0  |0.007393290737542434 |0.0073932907375424      |
|151603712|Spore                     |14.9    |332.0  |0.0012591565352776527|0.0012591565352777      |
|151603712|Fallout New Vegas         |12.1    |29.0   |0.0010209377313062048|0.0010209377313062      |
|151603712|Left 4 Dead 2             |8.9     |4.0    |7.486876696245502E-4 |0.0007486876696246      |
+---------+--------------------------+--------+-------+---------------------+------------------------+
only showing top 5 rows



In [0]:
(training, test) = playtimedf.randomSplit([0.7, 0.3], seed=100)

In [0]:
from pyspark.ml.recommendation import ALS

als = ALS(rank=7, maxIter=7, regParam=0.05, userCol = 'userId', itemCol = 'gamesId', ratingCol = 'normalized_playtime', implicitPrefs = True  ,seed=100)

model = als.fit(training)

2024/05/01 21:00:17 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '9a614c95829844c28c5c7fc78e1644da', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


#obtaining predictions

In [0]:
from pyspark.sql.functions import format_number
predictionsdf2 = model.transform(test).dropna()
predictionsdf2 = predictionsdf2.withColumn('full_prediction', format_number('prediction', 16))
predictionsdf2.show(5)

+------+--------------------+--------+-------+--------------------+------------------------+-----------+------------------+
|userId|               Games|playtime|gamesId| normalized_playtime|full_normalized_playtime| prediction|   full_prediction|
+------+--------------------+--------+-------+--------------------+------------------------+-----------+------------------+
|  5250|              Dota 2|     0.2|      0|8.507814427551706E-6|      0.0000085078144276| 0.01563695|0.0156369507312775|
| 76767|Call of Duty Blac...|    12.5|     57|0.001054968989016...|      0.0010549689890164| 0.26580596|0.2658059597015381|
| 76767|Call of Duty Mode...|    65.0|     25|0.005521571563481058|      0.0055215715634811| 0.36764848|0.3676484823226929|
| 76767|Call of Duty Mode...|     9.7|     62|8.167501850449638E-4|      0.0008167501850450| 0.21139024|0.2113902419805527|
| 76767|Call of Duty Worl...|   271.0|    267| 0.02304766928423757|      0.0230476692842376|0.036324758|0.0363247580826283|
+------+

#Evaluating the model

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', labelCol='normalized_playtime', predictionCol='prediction')

rmse = evaluator.evaluate(predictionsdf2)

print('Root Mean Square Error is %g' %rmse)

Root Mean Square Error is 0.119525


#Generating recommendations

In [0]:
myrecomm1 = playtimedf

myrecomm1 = model.transform(myrecomm1)

myrecomm1 = myrecomm1.dropna()

myrecomm1.orderBy('prediction', ascending=False).show(truncate=False)

+---------+-------------------------------+--------+-------+---------------------+------------------------+----------+
|userId   |Games                          |playtime|gamesId|normalized_playtime  |full_normalized_playtime|prediction|
+---------+-------------------------------+--------+-------+---------------------+------------------------+----------+
|165216496|Unturned                       |24.0    |3      |0.0020333676481848577|0.0020333676481849      |1.1579038 |
|561758   |Counter-Strike Source          |0.8     |5      |5.955470099286195E-5 |0.0000595547009929      |1.1448301 |
|152959594|Unturned                       |1.6     |3      |1.276172164132756E-4 |0.0001276172164133      |1.1307547 |
|182229935|Unturned                       |10.3    |3      |8.677970716102741E-4 |0.0008677970716103      |1.1191453 |
|151600301|Unturned                       |850.0   |3      |0.07230791481976195  |0.0723079148197620      |1.1079875 |
|189441775|Unturned                       |9.0  

#Training, evaluating the model and generating recommendation based on purchase. with different hyperparameters

In [0]:
(training, test) = purchasedf.randomSplit([0.7, 0.3], seed=100)

In [0]:
from pyspark.ml.recommendation import ALS

als = ALS(rank=10, maxIter=10, regParam=1, userCol = 'userId', itemCol = 'gamesId', ratingCol = 's3',  implicitPrefs = True, seed=100)

model = als.fit(training)

2024/05/01 21:00:49 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '685f853d534f4e2ba11dfa2baf313b7d', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [0]:
predictionsdf3 = model.transform(test).dropna()

predictionsdf3.show()

+------+--------------------+---+-------+-----------+
|userId|               Games| s3|gamesId| prediction|
+------+--------------------+---+-------+-----------+
|  5250|Counter-Strike So...|1.0|      5| 0.13459045|
|  5250|Half-Life 2 Death...|1.0|     13| 0.15166457|
|  5250|Half-Life 2 Episo...|1.0|     37| 0.08829392|
|  5250|Half-Life Blue Shift|1.0|     74| 0.14972946|
|  5250|Half-Life Opposin...|1.0|     75| 0.15090726|
|  5250|     Team Fortress 2|1.0|      1|0.052761752|
|  5250|Team Fortress Cla...|1.0|     79| 0.14541756|
| 76767|Arma 2 Operation ...|1.0|    129|0.045770593|
| 76767|Call of Duty Mode...|1.0|     25| 0.06123956|
| 76767|      Counter-Strike|1.0|      6| 0.07469164|
| 76767|Counter-Strike Gl...|1.0|      2|0.049692634|
| 76767|       Day of Defeat|1.0|     28| 0.07205724|
| 76767|  Deathmatch Classic|1.0|     34|0.071655504|
| 76767|Half-Life Blue Shift|1.0|     74| 0.06843292|
| 76767|Half-Life Opposin...|1.0|     75| 0.06748204|
| 76767|       Thief - Ghost

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', labelCol='s3', predictionCol='prediction')

rmse = evaluator.evaluate(predictionsdf3)

print('Root Mean Square Error is %g' %rmse)

Root Mean Square Error is 0.953204


In [0]:
myrecomm2 = purchasedf

myrecomm2 = model.transform(myrecomm2)

myrecomm2 = myrecomm2.dropna()

myrecomm2.orderBy('prediction', ascending=False).show(truncate=False)

+---------+------+---+-------+----------+
|userId   |Games |s3 |gamesId|prediction|
+---------+------+---+-------+----------+
|144004384|Dota 2|1.0|0      |0.4293307 |
|181523262|Dota 2|1.0|0      |0.4293307 |
|250870371|Dota 2|1.0|0      |0.4293307 |
|197302969|Dota 2|1.0|0      |0.4293307 |
|150103338|Dota 2|1.0|0      |0.4293307 |
|203494117|Dota 2|1.0|0      |0.4293307 |
|175143479|Dota 2|1.0|0      |0.4293307 |
|138938057|Dota 2|1.0|0      |0.4293307 |
|197455089|Dota 2|1.0|0      |0.4293307 |
|219566087|Dota 2|1.0|0      |0.4293307 |
|187131847|Dota 2|1.0|0      |0.4293307 |
|186207833|Dota 2|1.0|0      |0.4293307 |
|195071563|Dota 2|1.0|0      |0.4293307 |
|205145940|Dota 2|1.0|0      |0.4293307 |
|140293612|Dota 2|1.0|0      |0.4293307 |
|237390911|Dota 2|1.0|0      |0.4293307 |
|198572546|Dota 2|1.0|0      |0.4293307 |
|162396225|Dota 2|1.0|0      |0.4293307 |
|187851224|Dota 2|1.0|0      |0.4293307 |
|262017556|Dota 2|1.0|0      |0.4293307 |
+---------+------+---+-------+----

#Finding the best out of the 3 models

In [0]:
from pyspark.ml.tuning import ParamGridBuilder
#Create a parameter grid

parameters = ParamGridBuilder()\
    .addGrid(als.implicitPrefs, [True, False])\
    .addGrid(als.rank, [5,7,10])\
    .addGrid(als.maxIter, [5,7,10])\
    .addGrid(als.regParam, [0.01, 0.05, 1])\
    .build()

In [0]:
from pyspark.ml.tuning import TrainValidationSplit
#Define TrainValidationSplit

tvs = TrainValidationSplit()\
    .setSeed(100)\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(parameters)\
    .setEstimator(als)\
    .setEvaluator(evaluator)

In [0]:
gridsearchModel = tvs.fit(training)

2024/05/01 21:01:23 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '2c6d2d5fba014b08a6003393efa3a33b', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [0]:
bestModel = gridsearchModel.bestModel

print('Parameters for the best model:')
print('Rank Parameter: %g' %bestModel.rank)
print('maxIter Parameter: %g' %bestModel._java_obj.parent().getMaxIter())
print('regParam Parameter: %g' %bestModel._java_obj.parent().getRegParam())

Parameters for the best model:
Rank Parameter: 5
maxIter Parameter: 5
regParam Parameter: 0.01
