# Importing necessary libraries

In [2]:
import kagglehub
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, create_map, lit
from pyspark.sql.functions import length, trim
from itertools import chain
from pyspark import SparkContext
from pyspark.sql.functions import length
from pyspark.sql.functions import when, col
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Importing dataset

In [4]:
# Download latest version
path = kagglehub.dataset_download("najzeko/steam-reviews-2021")

print("Path to dataset files:", path)

Resuming download from 126877696 bytes (3066529214 bytes left)...
Resuming download from https://www.kaggle.com/api/v1/datasets/download/najzeko/steam-reviews-2021?dataset_version_number=1 (126877696/3193406910) bytes left.


 15%|███████████▊                                                                  | 463M/2.97G [01:40<12:37, 3.58MB/s]


KeyboardInterrupt: 

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Steam Reviews Analysis") \
    .getOrCreate()

# Path to the dataset folder
dataset_path = "/root/.cache/kagglehub/datasets/najzeko/steam-reviews-2021/versions/1"

# Identify the CSV file
file_path = f"{dataset_path}/steam_reviews.csv"

# Load the data into a PySpark DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True, multiLine=True, escape='"')

# Show a sample of the data
df.show(5)
# "schinese" is "simple chinese", "tchinese" is "traditional chinese"

In our analysis we took a sample of the dataset due to runtime errors we got, since our machines could not process the amount of the original dataset (21 million records). Our sample consists of only the English reviews, which are around 9 million, and is still considered to be a dataset suitable for Big Data.

In [None]:
df.filter(df.language == "english").count()

In [None]:
df.filter(df.language == "english").write.parquet("sample_df", mode="overwrite")
sample_df = spark.read.parquet("sample_df.parquet")

In [None]:
sample_df = sample_df.withColumn("review_length", length("review")) #create new column with the length of each review

##  **Exploratory Data Analysis**

In [None]:
# Rename the columns that have "." since we have errors in accessing them with their original name
sample_df = sample_df.withColumnRenamed("author.steamid", "author_steamid") \
                     .withColumnRenamed("author.num_games_owned", "author_num_games_owned") \
                     .withColumnRenamed("author.num_reviews", "author_num_reviews") \
                     .withColumnRenamed("author.playtime_forever", "author_playtime_forever") \
                     .withColumnRenamed("author.playtime_last_two_weeks", "author_playtime_last_two_weeks") \
                     .withColumnRenamed("author.playtime_at_review", "author_playtime_at_review") \
                     .withColumnRenamed("author.last_played", "author_last_played")

# Print the new column names
print(sample_df.columns)

We observed that certain columns are not informative for our objective, therefore we decided to keep only the relevant features.

In [None]:
# Keep only the relevant features
sample_df = sample_df.select("_c0","app_id","app_name","review_id","review","recommended","votes_helpful","steam_purchase","received_for_free","written_during_early_access","votes_helpful","author_steamid", "author_num_games_owned", "author_num_reviews", "author_playtime_forever", "author_playtime_at_review", "review_length")

View the first records of our dataset to understand the data that we have.

In [None]:
sample_df.show(5)

Calculate summary statistics regarding the time (in minutes) that users played the game at the moment of writing their review. Using those, we will categorize users into three main categories, according to whether they have played for little, medium, or much time.

In [None]:
sample_df.select("author_playtime_at_review").describe().show()

Similarly, we calculate summary statistics regarding the total playtime of users and we will categorize them into three categories according to the amount of time they played games in total.

In [None]:
sample_df.select("author_playtime_forever").describe().show()

In [None]:
# Create new columns for playtime categories
sample_df = sample_df.withColumn(
    "playtime_category",
    when(col("author_playtime_at_review") < 5000, "Low")
    .when((col("author_playtime_at_review") >= 5000) & (col("author_playtime_at_review") < 20000), "Medium")
    .otherwise("High")
)

sample_df = sample_df.withColumn(
    "playtime_forever_category",
    when(col("author_playtime_forever") < 5000, "Low")
    .when((col("author_playtime_forever") >= 5000) & (col("author_playtime_forever") < 20000), "Medium")
    .otherwise("High")
)

Use the aforementioned categories to visualize the number of users in each category, using barplots.

In [None]:
# Convert PySpark DataFrame to Pandas
pandas_df = sample_df.select("playtime_category").toPandas()

# Count occurrences of each category
category_counts = pandas_df["playtime_category"].value_counts()

# Plot bar chart
plt.figure(figsize=(8, 5))
category_counts.plot(kind="bar", color=["#7C6E7F", "#FF9999", "#99CCFF"])
plt.title("Distribution of Playtime Categories")
plt.xlabel("Playtime Category")
plt.ylabel("Count")
plt.xticks(rotation=0)
plt.grid(axis="y", linestyle="--", alpha=0.7)

# Show plot
plt.show()

In [None]:
# Convert PySpark DataFrame to Pandas
pandas_df = sample_df.select("playtime_forever_category").toPandas()

# Count occurrences of each category
category_counts_forever = pandas_df["playtime_forever_category"].value_counts()

# Plot bar chart
plt.figure(figsize=(8, 5))
category_counts_forever.plot(kind="bar", color=["#7C6E7F", "#FF9999", "#99CCFF"])
plt.title("Distribution of Total Playtime Categories")
plt.xlabel("Total Playtime Category")
plt.ylabel("Count")
plt.xticks(rotation=0)
plt.grid(axis="y", linestyle="--", alpha=0.7)

# Show plot
plt.show()

Check all the unique games in the reviews.

In [None]:
sample_df.select("app_name").distinct().show()

In [None]:
sample_df.groupBy("app_name").count().sort(col("count").desc()).show()  #sort in desc order to find apps with many reviews

In [None]:
#The following plot is not very interpretable, due to the large number of app names

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = sample_df.groupBy("app_name").count().toPandas()

plt.figure(figsize=(12, 6))
plt.bar(pandas_df['app_name'].astype(str), pandas_df['count'])
plt.xticks(rotation=90)
plt.xlabel("App Name")
plt.ylabel("Count")
plt.title("App Name Counts Distribution")
plt.tight_layout()
plt.show()


See how many users have purchased the app through Steam.

In [None]:
sample_df.groupBy("steam_purchase").count().show()

In [None]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_df = sample_df.groupBy("steam_purchase").count().toPandas()

plt.figure(figsize=(12, 6))
plt.bar(pandas_df['steam_purchase'].astype(str), pandas_df['count'])
plt.xticks(rotation=90)
plt.xlabel("steam_purchase")
plt.ylabel("Count")
plt.title("steam_purchase Counts Distribution")
plt.tight_layout()
plt.show()


In [None]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_df = sample_df.groupBy("recommended").count().toPandas()

plt.figure(figsize=(12, 6))
plt.bar(pandas_df['recommended'].astype(str), pandas_df['count'])
plt.xticks(rotation=90)
plt.xlabel("recommended")
plt.ylabel("Count")
plt.title("recommended Counts Distribution")
plt.tight_layout()
plt.show()


#we observe here that most reviews are positive - meaning they would recommend the app to other users

Check how many users have received the app for free.

In [None]:
sample_df.groupBy("received_for_free").count().show()

In [None]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_df = sample_df.groupBy("received_for_free").count().toPandas()

plt.figure(figsize=(12, 6))
plt.bar(pandas_df['received_for_free'].astype(str), pandas_df['count'])
plt.xticks(rotation=90)
plt.xlabel("received_for_free")
plt.ylabel("Count")
plt.title("received_for_free Counts Distribution")
plt.tight_layout()
plt.show()

#most users did not receive the app for free

Check how many reiviews are written by users that purchased their app before the official release (early access).

In [None]:
sample_df.groupBy("written_during_early_access").count().show()

In [None]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_df = sample_df.groupBy("written_during_early_access").count().toPandas()

plt.figure(figsize=(12, 6))
plt.bar(pandas_df['written_during_early_access'].astype(str), pandas_df['count'])
plt.xticks(rotation=90)
plt.xlabel("written_during_early_access")
plt.ylabel("Count")
plt.title("written_during_early_access Counts Distribution")
plt.tight_layout()
plt.show()


In [None]:
sample_df.groupBy("author_steamid").count().sort(col("count").desc()).show()   #sort in descending order to see the most frequent reviewers!

# Bivariate analysis

We want to check if a user is more likely to recommend an app if they receive it for free

In [None]:
# Calculate total reviews and recommended true counts
pivot_table = df.groupBy('received_for_free').agg(
    {'recommended': 'count'}
).withColumnRenamed('count(recommended)', 'Total Reviews')

recommended_true = df.filter(df['recommended'] == True).groupBy('received_for_free').count().withColumnRenamed('count', 'Recommended (True)')

# Join the two DataFrames
pivot_table = pivot_table.join(recommended_true, 'received_for_free', 'left_outer')

# Calculate proportion
pivot_table = pivot_table.withColumn('Proportion Recommended', pivot_table['Recommended (True)'] / pivot_table['Total Reviews'])

# Show results
pivot_table.show()

As shown in the above table, the proportion of users who recieved the apps for free and would recommend them is roughly equal to the proportion of users who did not receive the apps for free and would recommend them.

To see if longer reviews are more positive or negative

In [None]:
# Calculate total reviews and recommended true counts for 'review_length'
pivot_table = sample_df.groupBy('review_length').agg(
    {'recommended': 'count'}
).withColumnRenamed('count(recommended)', 'Total Reviews')

recommended_true = sample_df.filter(df['recommended'] == True).groupBy('review_length').count().withColumnRenamed('count', 'Recommended (True)')

# Join and calculate proportion
pivot_table = pivot_table.join(recommended_true, 'review_length', 'left_outer')
pivot_table = pivot_table.withColumn('Proportion Recommended', pivot_table['Recommended (True)'] / pivot_table['Total Reviews'])

# Show results
pivot_table.sort(col("review_length").desc()).show()  # to see longer reviews first




#NOTE: we can make a scatterplot of these values to check for correlation -- see cell below

In [None]:
# Convert Spark DataFrame to Pandas for visualization
pivot_table_pd = pivot_table.select('review_length', 'Proportion Recommended').toPandas()

# Scatterplot
plt.figure(figsize=(10, 6))
plt.scatter(pivot_table_pd['review_length'], pivot_table_pd['Proportion Recommended'], alpha=0.5)
plt.title('Review Length vs Proportion Recommended')
plt.xlabel('Review Length')
plt.ylabel('Proportion Recommended')
plt.grid(True)
plt.show()

Does higher playtime correlate with positive recommendations?

In [None]:
# Calculate total reviews and recommended true counts for 'author_playtime_at_review'
pivot_table = sample_df.groupBy('author_playtime_at_review').agg(
    {'recommended': 'count'}
).withColumnRenamed('count(recommended)', 'Total Reviews')

recommended_true = sample_df.filter(df['recommended'] == True).groupBy('author_playtime_at_review').count().withColumnRenamed('count', 'Recommended (True)')

# Join and calculate proportion
pivot_table = pivot_table.join(recommended_true, 'author_playtime_at_review', 'left_outer')
pivot_table = pivot_table.withColumn('Proportion Recommended', pivot_table['Recommended (True)'] / pivot_table['Total Reviews'])

# Show results
pivot_table.sort(col("author_playtime_at_review").desc()).show()  # to see people who played for longer first


In [None]:
# Convert Spark DataFrame to Pandas for visualization
pivot_table_pd = pivot_table.select('author_playtime_at_review', 'Proportion Recommended').toPandas()

# Scatterplot
plt.figure(figsize=(10, 6))
plt.scatter(pivot_table_pd['author_playtime_at_review'], pivot_table_pd['Proportion Recommended'], alpha=0.5)
plt.title('author_playtime_at_review vs Proportion Recommended')
plt.xlabel('author_playtime_at_review')
plt.ylabel('Proportion Recommended')
plt.grid(True)
plt.show()

Are frequent reviewers more positive or critical?

In [None]:
# Calculate total reviews and recommended true counts for 'author_num_reviews'
pivot_table = sample_df.groupBy('author_num_reviews').agg(
    {'recommended': 'count'}
).withColumnRenamed('count(recommended)', 'Total Reviews')

recommended_true = sample_df.filter(df['recommended'] == True).groupBy('author_num_reviews').count().withColumnRenamed('count', 'Recommended (True)')

# Join and calculate proportion
pivot_table = pivot_table.join(recommended_true, 'author_num_reviews', 'left_outer')
pivot_table = pivot_table.withColumn('Proportion Recommended', pivot_table['Recommended (True)'] / pivot_table['Total Reviews'])

# Show results
pivot_table.sort(col("author_num_reviews").desc()).show()  # to see people who have made more reviews first


In [None]:
# Convert Spark DataFrame to Pandas for visualization
pivot_table_pd = pivot_table.select('author_num_reviews', 'Proportion Recommended').toPandas()

# Scatterplot
plt.figure(figsize=(10, 6))
plt.scatter(pivot_table_pd['author_num_reviews'], pivot_table_pd['Proportion Recommended'], alpha=0.5)
plt.title('author_num_reviews vs Proportion Recommended')
plt.xlabel('author_num_reviews')
plt.ylabel('Proportion Recommended')
plt.grid(True)
plt.show()

Of those that received the app for free, how many reccomend the app?

In [None]:
df_pivot = sample_df.groupBy("received_for_free") \
    .pivot("recommended") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "recommended_true") \
    .withColumnRenamed("false", "recommended_false")

df_pivot.show()

Of all the apps that are being reviewed, how many were purchased on Steam?

In [None]:
df_pivot = sample_df.groupBy("app_name") \
    .pivot("steam_purchase") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "steam_purchase_true") \
    .withColumnRenamed("false", "steam_purchase_false")

df_pivot.show()

For each app, how many users have recommended it?

In [None]:
df_pivot = sample_df.groupBy("app_name") \
    .pivot("recommended") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "recommended_true") \
    .withColumnRenamed("false", "recommended_false")

df_pivot.show()

For each app, how many users have got it for free?

In [None]:
df_pivot = sample_df.groupBy("app_name") \
    .pivot("received_for_free") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "received_for_free_true") \
    .withColumnRenamed("false", "received_for_free_false")

df_pivot.show()

For each app, how many reviews have been written from users that had early access?

In [None]:
df_pivot = sample_df.groupBy("app_name") \
    .pivot("written_during_early_access") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "written_during_early_access_true") \
    .withColumnRenamed("false", "written_during_early_access_false")

df_pivot.show()

Of all the apps that have been purchased through Steam, how many are recommended?

In [None]:
df_pivot = sample_df.groupBy("steam_purchase") \
    .pivot("recommended") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "recommended_true") \
    .withColumnRenamed("false", "recommended_false")

df_pivot.show()

How many apps did each user had early access on?


In [None]:
df_pivot = sample_df.groupBy("author_steamid") \
    .pivot("written_during_early_access") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "written_during_early_access_true") \
    .withColumnRenamed("false", "written_during_early_access_false")

df_pivot.show()

For each For each category of the playtime (up to the day of the review), how many users have reccomended the apps of the review?

In [None]:
df_pivot = sample_df.groupBy("playtime_category") \
    .pivot("recommended") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "recommended_true") \
    .withColumnRenamed("false", "recommended_false")

df_pivot.show()

For each category of the total playtime, how many users have reccomended the apps of the review?

In [None]:
df_pivot = sample_df.groupBy("playtime_forever_category") \
    .pivot("recommended") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "recommended_true") \
    .withColumnRenamed("false", "recommended_false")

df_pivot.show()

Of the apps that have been purchased during early access, how many are recommended?

In [None]:
df_pivot = sample_df.groupBy("written_during_early_access") \
    .pivot("recommended") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "recommended_true") \
    .withColumnRenamed("false", "recommended_false")

df_pivot.show()

For each category of number of games owned, how many apps are recommended?

In [None]:
df_pivot = sample_df.groupBy("author_num_games_owned") \
    .pivot("recommended") \
    .agg(F.count("*")) \
    .withColumnRenamed("true", "recommended_true") \
    .withColumnRenamed("false", "recommended_false")

df_pivot.show()

# Collaborative Filtering

In [None]:
sample_df.show(10)

In [None]:
sample_df.select("recommended").distinct().show()

In [None]:
# Identify distribution in the 'recommended' column
sample_df.groupBy('recommended').count().show()

We observe a significant class imbalance (true: 8.58M and false: 1.05M) and hence, we proceed with 3% for Dominant Class & 10% for Minority Class (balanced reduction).


We tried using a stratified approach but the stratified df was returning 0 rows (.sampleBy() may fail if the specified fraction is too small to select records from a partition).

```python
# Define sampling fractions for each category
fractions = {"true": 0.03, "false": 0.10}  # Stratified sampling with balanced reduction

# Perform stratified sampling
stratified_sample = sample_df.sampleBy("recommended", fractions=fractions, seed=42)

# Show sampled data
stratified_sample.show(5)

# Count rows to verify sample size
print("Total Rows in Sample:", stratified_sample.count())
```

Hence, we proceeded with the sample() function.

In [None]:
# Sample 5% for balanced speed and accuracy
sample_df2 = sample_df.sample(fraction=0.05, seed=42)

sample_df2.show(5)

In [None]:
sample_df2.groupBy('recommended').count().show()

### Pre-processing

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql import Row
from pyspark.sql import types as T

# Create a mapping dictionary
def map_recommendation(value):
    if value == "true":
        return 1
    elif value == "false":
        return 0
    else:
        return 0  # Default value in case of unexpected data

# Register UDF (User Defined Function)
from pyspark.sql import functions as F
from pyspark.sql.functions import udf

map_recommendation_udf = udf(map_recommendation, T.IntegerType())

# Apply mapping logic
sample_df2 = sample_df2.withColumn("recommended", map_recommendation_udf("recommended"))

# Index user and item IDs for collaborative filtering
user_indexer = StringIndexer(inputCol="review_id", outputCol="userId").fit(sample_df2)
item_indexer = StringIndexer(inputCol="app_id", outputCol="itemId").fit(sample_df2)

# Apply transformations
sample_df2 = user_indexer.transform(sample_df2)
sample_df2 = item_indexer.transform(sample_df2)

# Select relevant columns
data = sample_df2.select("userId", "itemId", "recommended")

In [None]:
data.show(5)

### Collaborative Filtering Using ALS (Alternating Least Squares)

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Define ALS model
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="userId",
    itemCol="itemId",
    ratingCol="recommended",
    coldStartStrategy="drop", # Prevent issues with unseen items in test data
    implicitPrefs=True  # For imbalanced data
)

# Train-test split
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = als.fit(train_data)

In [None]:
# Predictions
predictions = model.transform(test_data)

# Cast 'prediction' to double to satisfy evaluator's requirement
predictions = predictions.withColumn("prediction", predictions["prediction"].cast("double"))

# Evaluate using BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="recommended",
    rawPredictionCol="prediction",
    metricName="areaUnderROC"
)

# Compute AUC
auc = evaluator.evaluate(predictions)
print(f"AUC (Area Under Curve): {auc:.4f}")

### Generating Recommendations

Generating personalized recommendations for both users and items.

In [None]:
# User-based recommendations
# Recommend top 10 items for each user
user_recommendations = model.recommendForAllUsers(10)
user_recommendations.show(5)

In [None]:
# Item-based recommendations
# Recommend top 10 users for each item
item_recommendations = model.recommendForAllItems(10)
item_recommendations.show(5)

### Cosine Similarity

In [None]:
import numpy as np

# Extract item factors from ALS model
item_factors = model.itemFactors.rdd.map(lambda row: (row.id, np.array(row.features)))

# Compute cosine similarity using RDD
def cosine_similarity(x, y):
    dot_product = np.dot(x, y)
    norm_x = np.linalg.norm(x)
    norm_y = np.linalg.norm(y)
    return float(dot_product / (norm_x * norm_y))

# Generate all possible combinations for similarity comparison
pairs = item_factors.cartesian(item_factors).filter(lambda x: x[0][0] != x[1][0])

# Compute cosine similarity for each pair
similarity_rdd = pairs.map(lambda pair: (
    pair[0][0],   # Item 1 ID
    pair[1][0],   # Item 2 ID
    cosine_similarity(pair[0][1], pair[1][1])  # Cosine similarity score
))

# Convert back to DataFrame
similarity_df = spark.createDataFrame(similarity_rdd, ["item1", "item2", "similarity"])

# Filter most similar items (e.g., similarity > 0.8)
top_similar_items = similarity_df.filter("similarity > 0.8")
top_similar_items.show(10)