### Implementation steps
Load cleaned Steam reviews data from HDFS into Spark.

Focus only on selected important columns (app_id, app_name, author_steamid, etc.).

Remove outliers from author_playtime_forever.

Filter serious users (long playtime, valid Steam IDs).

Map user IDs and game IDs to small integer indices.

Train an ALS recommendation model (Collaborative Filtering).

Recommend games for games (item-item recommendations).



Your Goal	Status
Find serious players who played more than one game
Filter by high playtime (above average)
Focus on real valid users (Steam IDs)	
Map users and games to integer indices (for ALS)	
Recommend 5 games that serious players are likely to like


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import from_unixtime
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer, IndexToString
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from scipy import stats
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql.functions import col

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SteamReviewsHDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "100")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/30 12:42:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/30 12:42:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/04/30 12:42:35 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/04/30 12:42:35 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [5]:
df = spark.read.parquet("/user/tejashree/project/data/processed/cleaned_steam_reviews.parquet")

#### Are there any pairs of games that are played by the same players, i.e., if player A plays game X, then there is a good chance they play game Y also? Analyze any pattern

- Using the `data_rec` data frame, we can extract the review authors who have given reviews for more than one game.
- We will use the `author_playtime_forever` column to filter the gamers with a play time more than average so as to decrease the amassing of games by a single author.
- We will consider five games that are common among these reviewers and recommend them to other players who would fall in the same category.

Using the data_rec DataFrame, we first identify review authors who have reviewed more than one game. To ensure that these are serious players (and not users who just briefly tried a game), we filter based on the author_playtime_forever column — keeping only those who have spent more than the average playtime across all users. Then, for these active players, we analyze the games they have commonly played and recommended. Based on this behavior, we recommend five games that are frequently associated with such serious players to other users who exhibit similar engagement patterns.

In [8]:
col_rec = ["app_id", "app_name", "review_id", "language", "author_steamid", "timestamp_created" ,"author_playtime_forever","recommended"]

In [9]:
data_rec = df.select(*col_rec)

### Remove app_names with total game count less than 500

In [11]:

# Group by app name and count
app_names = data_rec.groupBy("app_name").count()

# Filter where count is less than 500
apps_less_than_100 = app_names.filter(col("count") <400)

# Sort ascendingly by count
apps_sorted = apps_less_than_100.orderBy(col("count").desc())

# Convert to pandas
apps_sorted_pd = apps_sorted.toPandas()

apps_sorted_pd


                                                                                

Unnamed: 0,app_name,count
0,DRAGON QUEST HEROES™ II,319
1,phone number,183
2,Neon Hardcorps,151
3,however,140
4,"though.""",139
...,...,...
360599,их нельзя сравнивать,1
360600,I do wonder why there is no jumping,1
360601,even that top down baldurs gate clone on the ...,1
360602,"it's no """"game-of-the-generation"""". Still a s...",1


In [12]:
# Group by app_name and count
app_names = df.groupBy("app_name").count()

# Find app names with 500 or more reviews
apps_more_than_500 = app_names.filter(col("count") >= 200)

# Join back to the original dataframe to keep only popular apps
data_rec = data_rec.join(apps_more_than_500, on="app_name", how="inner")


In [13]:
from pyspark.sql.functions import col

data_rec.filter(col("author_steamid") == 76561198020760734).show(truncate=False)


                                                                                

+----------------------+------+---------+--------+-----------------+-------------------+-----------------------+-----------+-----+
|app_name              |app_id|review_id|language|author_steamid   |timestamp_created  |author_playtime_forever|recommended|count|
+----------------------+------+---------+--------+-----------------+-------------------+-----------------------+-----------+-----+
|Counter-Strike: Source|240   |59980286 |russian |76561198020760734|2019-12-16 14:32:25|2254858.0              |true       |99385|
+----------------------+------+---------+--------+-----------------+-------------------+-----------------------+-----------+-----+



In [14]:
def remove_outliers(df, column):
    q1, q3 = df.approxQuantile(column, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    lower_limit = q1 - 2 * iqr
    upper_limit = q3 + 2 * iqr
    return df.filter((col(column) >= lower_limit) & (col(column) <= upper_limit))
data_rec = remove_outliers(data_rec, "author_playtime_forever")
mean_playtime = data_rec.agg(mean("author_playtime_forever").alias("Mean")).collect()[0]["Mean"]

                                                                                

In [15]:
mean_playtime/3600 # average play time in hours

2.065846578414234

This code filters the dataset to retain only serious users who have played each game for at least five times the average playtime, groups them by their Steam ID, keeps only those users who have reviewed more than one such game, ensures the user IDs are valid by checking they are above a specific threshold (76560000000000000), and finally orders these users by the number of games they have seriously engaged with in descending order.

In [17]:
pair_games = data_rec.filter(col("author_playtime_forever")>=5*mean_playtime).groupBy("author_steamid").count()
pair_games = pair_games.filter((pair_games["count"]>1)&(pair_games["author_steamid"]>=76560000000000000)).orderBy(pair_games["count"].desc())
pair_games.show()



+-----------------+-----+
|   author_steamid|count|
+-----------------+-----+
|76561199003745475|    9|
|76561198303461537|    6|
|76561198847533327|    5|
|76561198107639116|    5|
|76561198008966571|    4|
|76561198053974624|    4|
|76561198262809392|    4|
|76561198368118101|    4|
|76561198119696900|    4|
|76561198056543838|    3|
|76561198196563587|    3|
|76561198909989107|    3|
|76561198092228575|    3|
|76561198028209057|    3|
|76561198034267568|    3|
|76561198024476303|    3|
|76561198168867327|    3|
|76561198126942964|    3|
|76561198897905512|    3|
|76561198009290759|    3|
+-----------------+-----+
only showing top 20 rows



                                                                                

In [18]:
pair_games.count()

                                                                                

2305

In [19]:
new_pair_games = data_rec.filter(col("author_playtime_forever")>=5*mean_playtime)
new_pair_games = new_pair_games.filter(new_pair_games["author_steamid"]>=76560000000000000).select("author_steamid","app_id", "app_name","recommended")
new_pair_games.show()

+-----------------+------+-------------------+-----------+
|   author_steamid|app_id|           app_name|recommended|
+-----------------+------+-------------------+-----------+
|76561198850036944|253230|      A Hat in Time|       true|
|76561198109827334|253230|      A Hat in Time|       true|
|76561198137741861|253230|      A Hat in Time|       true|
|76561198431146240|253230|      A Hat in Time|       true|
|76561198391788773|253230|      A Hat in Time|       true|
|76561198338554087|253230|      A Hat in Time|       true|
|76561198964615267|253230|      A Hat in Time|       true|
|76561198804909831|253230|      A Hat in Time|       true|
|76561198800765514|253230|      A Hat in Time|       true|
|76561198156421975|253230|      A Hat in Time|       true|
|76561198845653713|253230|      A Hat in Time|       true|
|76561198268649264|253230|      A Hat in Time|       true|
|76561198125315612|253230|      A Hat in Time|       true|
|76561198178496474|253230|      A Hat in Time|       tru

                                                                                

In [20]:
new_pair_games.count()

                                                                                

248751

In [21]:
# Convert author_steamid and app_id to indices
author_indexer = StringIndexer(inputCol="author_steamid", outputCol="author_index").fit(new_pair_games)
app_indexer = StringIndexer(inputCol="app_name", outputCol="app_index").fit(new_pair_games)
new_pair_games = new_pair_games.withColumn("Rating", when(col("recommended") == True, 5).otherwise(1))

                                                                                

In [22]:
new_pair = author_indexer.transform(app_indexer.transform(new_pair_games))
new_pair.show()



+-----------------+------+-------------------+-----------+------+---------+------------+
|   author_steamid|app_id|           app_name|recommended|Rating|app_index|author_index|
+-----------------+------+-------------------+-----------+------+---------+------------+
|76561198850036944|253230|      A Hat in Time|       true|     5|    152.0|    225956.0|
|76561198109827334|253230|      A Hat in Time|       true|     5|    152.0|     80848.0|
|76561198137741861|253230|      A Hat in Time|       true|     5|    152.0|     98703.0|
|76561198431146240|253230|      A Hat in Time|       true|     5|    152.0|    205883.0|
|76561198391788773|253230|      A Hat in Time|       true|     5|    152.0|    191628.0|
|76561198338554087|253230|      A Hat in Time|       true|     5|    152.0|    173018.0|
|76561198964615267|253230|      A Hat in Time|       true|     5|    152.0|    236330.0|
|76561198804909831|253230|      A Hat in Time|       true|     5|    152.0|    215750.0|
|76561198800765514|25

25/04/30 12:42:58 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
                                                                                

In [23]:
games = new_pair.select("app_index","app_name").distinct().orderBy("app_index")
games.count()

                                                                                

260

In [24]:
# Create an ALS (Alternating Least Squares) model
als = ALS(maxIter=10, regParam=0.01, userCol="app_index", itemCol="author_index", ratingCol="Rating", coldStartStrategy="drop")

# Fit the model to the data
model = als.fit(new_pair)

# Generate recommendations for all items
app_recommendations = model.recommendForAllItems(5)  # Number of recommendations per item

# Display the recommendations
app_recommendations.show(truncate=False)

25/04/30 12:43:01 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:43:01 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:43:02 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:43:02 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:43:03 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:43:03 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:43:04 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:43:04 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/04/30 12:43:04 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:43:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
25/04/30 12:43:05 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB

+------------+----------------------------------------------------------------------------------------+
|author_index|recommendations                                                                         |
+------------+----------------------------------------------------------------------------------------+
|4           |[{136, 6.8023324}, {175, 6.188595}, {168, 5.801366}, {213, 5.7765493}, {191, 5.6294155}]|
|7           |[{212, 8.088351}, {2, 7.104186}, {197, 7.102503}, {62, 6.396319}, {233, 6.301838}]      |
|8           |[{30, 7.4820223}, {225, 7.073592}, {26, 7.022451}, {186, 6.9353147}, {46, 6.861062}]    |
|23          |[{140, 5.5344667}, {57, 5.4243507}, {29, 5.2698107}, {68, 5.109564}, {16, 4.9924684}]   |
|31          |[{73, 5.0006347}, {66, 4.989454}, {186, 3.8505864}, {124, 3.471138}, {190, 3.361213}]   |
|34          |[{202, 7.387505}, {124, 6.5198226}, {62, 6.5031896}, {224, 6.1364746}, {63, 5.3859353}] |
|39          |[{190, 5.3203335}, {100, 5.1127577}, {73, 4.99922}

25/04/30 12:43:20 WARN DAGScheduler: Broadcasting large task binary with size 10.4 MiB
                                                                                

In [25]:
games.show()

                                                                                

+---------+--------------------+
|app_index|            app_name|
+---------+--------------------+
|      0.0|PLAYERUNKNOWN'S B...|
|      1.0|Tom Clancy's Rain...|
|      2.0|  Grand Theft Auto V|
|      3.0|       Rocket League|
|      4.0|            Terraria|
|      5.0|                Rust|
|      6.0|         Garry's Mod|
|      7.0|    Dead by Daylight|
|      8.0|Monster Hunter: W...|
|      9.0|ARK: Survival Evo...|
|     10.0|Euro Truck Simula...|
|     11.0|The Elder Scrolls...|
|     12.0|            PAYDAY 2|
|     13.0|   Hearts of Iron IV|
|     14.0|              Arma 3|
|     15.0|Sid Meier's Civil...|
|     16.0|Europa Universali...|
|     17.0|Total War: WARHAM...|
|     18.0|           Fallout 4|
|     19.0|The Binding of Is...|
+---------+--------------------+
only showing top 20 rows



### save model

In [27]:
# Save your ALS trained model
model.save("/user/tejashree/project/models/recommendation/als_game_recommendation_model")


25/04/30 12:43:23 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
25/04/30 12:43:24 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
                                                                                

In [28]:
from pyspark.ml.recommendation import ALSModel
model = ALSModel.load("/user/tejashree/project/models/recommendation/als_game_recommendation_model")


### save recomendations

In [39]:
app_recommendations = model.recommendForAllItems(5)
app_recommendations.write.mode("overwrite").parquet("/user/tejashree/project/outputs/app_recommendations.parquet")

                                                                                

 ### saving app_index → app_name mapping,

In [42]:
games.write.mode("overwrite").parquet("/user/tejashree/project/data/mappings/games_mapping.parquet")


                                                                                

### author_index → author_steamid mapping,

In [49]:
# Save mapping table: author_steamid ↔ author_index
author_mapping = new_pair.select("author_steamid", "author_index").distinct()
author_mapping.write.mode("overwrite").parquet("/user/tejashree/project/data/mappings/author_mapping.parquet")


25/04/30 12:45:25 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:26 WARN DAGScheduler: Broadcasting large task binary with size 15.2 MiB
                                                                                

In [51]:
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: After you fit your model on `new_pair`
model = als.fit(new_pair)

# Step 2: Predict back on your input data (new_pair)
predictions = model.transform(new_pair)

# Step 3: Create an evaluator
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="Rating",
    predictionCol="prediction"
)

# Step 4: Calculate RMSE
rmse = evaluator.evaluate(predictions)
print(f"✅ RMSE of your ALS model = {rmse:.4f}")


25/04/30 12:45:30 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:30 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:30 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:31 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:31 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:31 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:32 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:32 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:33 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:33 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:33 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
25/04/30 12:45:34 WARN DAGScheduler: Broadc

✅ RMSE of your ALS model = 0.0047


25/04/30 12:45:43 WARN DAGScheduler: Broadcasting large task binary with size 10.4 MiB
                                                                                