# Content-based filtering model

In [1]:
# imports
from pyspark.sql import SparkSession
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql.window import Window #for ranking
from pyspark.sql.functions import lit, mean, stddev_pop
from pyspark.sql.functions import collect_set, collect_list
from pyspark.sql.functions import struct
from pyspark.sql.functions import slice
from pyspark.sql.functions import col
from pyspark.sql.functions import desc
from pyspark.sql.functions import udf
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import DecimalType, ArrayType, IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.sql.functions import avg, broadcast, when, rand

## Overview
1. Load dataset.
2. Create item profiles for each game using genre, developer and publisher.
3. For each user-game pair, normalize playtime into a 1-5 rating scale.
4. Create user profiles by computing a weighted average of item profiles and ratings of previously played games.
5. Given a user, compute the cosine similarity between the user's profile and each (unplayed) game's item profile.
6. Recommend the user's 10 top similar games.

### Define cosine similarity and weighted average functions

In [2]:
# cosine similarity function
def cosine_similarity_udf(a, b):
    dot_product = sum([x * y for x, y in zip(a, b)])
    norm_a = sum([x**2 for x in a])**0.5
    norm_b = sum([x**2 for x in b])**0.5
    return dot_product / (norm_a * norm_b)


def weighted_avg_features(ratings, combined_vectors):
    if not ratings or not combined_vectors:
        return []

    weighted_sum = [0] * len(combined_vectors[0])
    total_weight = 0

    for rating, combined_vector in zip(ratings, combined_vectors):
        weight = float(rating)
        total_weight += weight
        weighted_sum = [ws + weight * f for ws, f in zip(weighted_sum, combined_vector)]

    if total_weight == 0:
        return weighted_sum

    return [ws / total_weight for ws in weighted_sum]

## Load Dataset

In [3]:
# spark = SparkSession.builder.appName('ReadMySQL') \
# .config("spark.driver.memory", "32g") \
# .config("spark.sql.pivotMaxValues", "1000000") \
# .config("spark.jars", "C:\\Program Files (x86)\\MySQL\\Connector J 8.0\\mysql-connector-j-8.0.32.jar") \
# .getOrCreate()

spark = SparkSession.builder.appName('ReadMySQL') \
.config("spark.driver.memory", "32g") \
.config("spark.sql.pivotMaxValues", "1000000") \
.getOrCreate()


# sql = "select * from 01_sampled_games_2v2 WHERE playtime_forever IS NOT NULL AND playtime_forever > 0"
sql = sql = """
SELECT p.steamid, p.appid, p.playtime_2weeks, p.playtime_forever, p.dateretrieved, g.genre, d.Developer, pb.Publisher
FROM 01_sampled_games_2v2 AS p
JOIN Games_Genres AS g ON p.appid = g.appid
JOIN Games_Developers AS d ON p.appid = d.appid
JOIN Games_Publishers AS pb ON p.appid = pb.appid
WHERE p.playtime_forever IS NOT NULL AND p.playtime_forever > 0
"""
# database = "steam"
# user = "root"
# password = "root"
# server = "127.0.0.1"
# port = 3307
# jdbc_url = f"jdbc:mysql://{server}:{port}/{database}"
# jdbc_driver = "com.mysql.cj.jdbc.Driver"

# # Create a data frame by reading data from Oracle via JDBC
# df = spark.read.format("jdbc") \
#     .option("url", jdbc_url) \
#     .option("query", sql) \
#     .option("user", user) \
#     .option("password", password) \
#     .option("driver", jdbc_driver) \
#     .load()

# df = df.drop("playtime_2weeks", "dateretrieved")

database = "steam"
user = "root"
password = "example"
server = "192.168.2.62"
port = 3306
jdbc_url = f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme"
jdbc_driver = "org.mariadb.jdbc.Driver"

# Create a data frame by reading data from Oracle via JDBC
df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("query", sql) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbc_driver) \
    .load()

df = df.drop("playtime_2weeks", "dateretrieved")

23/04/12 12:57:16 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.122.1 instead (on interface virbr0)
23/04/12 12:57:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/12 12:57:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/12 12:57:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# count number of rows in the dataframe
row_count = df.count()
# print the row count
print("Dataframe has", row_count, " rows.")
df.show()

                                                                                

Dataframe has 212589  rows.
+-----------------+-----+----------------+----------+--------------------+--------------------+
|          steamid|appid|playtime_forever|     genre|           Developer|           Publisher|
+-----------------+-----+----------------+----------+--------------------+--------------------+
|76561197960268000|  300|             109|    Action|               Valve|               Valve|
|76561197960268000| 1300|              94|    Action|Ritual Entertainment|Ritual Entertainment|
|76561197960268000| 2100|             110|    Action|      Arkane Studios|             Ubisoft|
|76561197960268000| 2100|             110|       RPG|      Arkane Studios|             Ubisoft|
|76561197960268000| 4000|             152|     Indie|   Facepunch Studios|               Valve|
|76561197960268000| 4000|             152|Simulation|   Facepunch Studios|               Valve|
|76561197960268000| 2600|              59|    Action|        Troika Games|          Activision|
|76561197960

## Create item profiles

In [5]:
# build the item profiles
# Group the data by 'appid' and collect the genres for each game into a list
games_genres_df = df.groupBy("appid").agg(collect_set("genre").alias("genres"))
# Group the data by 'appid' and collect the developers for each game into a list
games_developers_df = df.groupBy("appid").agg(collect_set("Developer").alias("developers"))
# Group the data by 'appid' and collect the publishers for each game into a list
games_publishers_df = df.groupBy("appid").agg(collect_set("Publisher").alias("publishers"))

# Create a list of unique genres
unique_genres = sorted(df.select("genre").distinct().rdd.flatMap(lambda x: x).collect())
# Create a list of unique developers
unique_developers = sorted(df.select("Developer").distinct().rdd.flatMap(lambda x: x).collect())
# Create a list of unique publishers
unique_publishers = sorted(df.select("Publisher").distinct().rdd.flatMap(lambda x: x).collect())


# Define a UDF to create a binary vector for each game's genres
@udf(returnType=ArrayType(IntegerType()))
def genre_vector(genres):
    return [1 if genre in genres else 0 for genre in unique_genres]

# Define a UDF to create a binary vector for each game's developer
@udf(returnType=ArrayType(IntegerType()))
def developer_vector(developers):
    return [1 if developer in developers else 0 for developer in unique_developers]

# Define a UDF to create a binary vector for each game's publisher
@udf(returnType=ArrayType(IntegerType()))
def publisher_vector(publishers):
    return [1 if publisher in publishers else 0 for publisher in unique_publishers]


# Add a new column 'genre_vector' to the DataFrame
# the genre vector will now have a 1 for each genre that the game belongs to
games_genres_df = games_genres_df.withColumn("genre_vector", genre_vector("genres"))
# Add a new column 'developer_vector' to the DataFrame
games_developers_df = games_developers_df.withColumn("developer_vector", developer_vector("developers"))
# Add a new column 'publisher_vector' to the DataFrame
games_publishers_df = games_publishers_df.withColumn("publisher_vector", publisher_vector("publishers"))

# games_genres_df.show(truncate=False)
# Join the main DataFrame with the games_genres_df on appid to include the genre_vector
df = df.join(broadcast(games_genres_df.select("appid", "genre_vector")), on="appid")
# Join the main DataFrame with the games_developers_df
df = df.join(broadcast(games_developers_df.select("appid", "developer_vector")), on="appid")
# Join the main DataFrame with the games_publishers_df
df = df.join(broadcast(games_publishers_df.select("appid", "publisher_vector")), on="appid")

df.show()

                                                                                

+-----+-----------------+----------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|appid|          steamid|playtime_forever|     genre|           Developer|           Publisher|        genre_vector|    developer_vector|    publisher_vector|
+-----+-----------------+----------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  300|76561197960268000|             109|    Action|               Valve|               Valve|[1, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|
| 1300|76561197960268000|              94|    Action|Ritual Entertainment|Ritual Entertainment|[1, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|
| 2100|76561197960268000|             110|    Action|      Arkane Studios|             Ubisoft|[1, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|
| 2100|76561197960268000|             110|    

                                                                                

## Per-Game 1-5 Rating Normalization
For each game, we calculate the mean and standard deviation. We then create buckets for each rating:
### Scaling factor
The cut points are scaled on a per-user basis since some users are more casual gamers while others may spend a lot more time gaming. The scaling factor is calculated as follows:

(user_playtime_average)/(global_playtime_average)

### Cut points
* Cut point 1: (mean - std_dev*0.5) * scaling_factor if > 0, else 0
* Cut point 2: mean
* Cut point 3: (mean + std_dev*0.5) * scaling_factor
* Cut point 4: (mean + std_dev) * scaling_factor
### Ratings
* Rating 1: 0 < x < cut point 1
* Rating 2: cut point 1 < x < cut point 2
* Rating 3: cut point 2 < x < cut point 3
* Rating 4: cut point 3 < x < cut point 4
* Rating 5: cut point 5 < x < inf


In [6]:
# # Calculate the per-game mean and standard deviation of the playtime column
# game_stats = df.filter(col("playtime_forever") > 0).groupBy("appid").agg(
#     mean("playtime_forever").alias("game_mean_playtime"),
#     stddev_pop("playtime_forever").alias("game_stddev_playtime")
# )

# # Calculate the overall playtime average
# overall_playtime_avg = df.filter(col("playtime_forever") > 0).agg(avg("playtime_forever")).collect()[0][0]

# # Calculate the per-steamid playtime average
# user_playtime_avg = df.filter(col("playtime_forever") > 0).groupBy("steamid").agg(avg("playtime_forever")).withColumnRenamed("avg(playtime_forever)", "user_playtime_avg")

# # Join the user_playtime_avg dataframe with the main dataframe
# df = df.join(user_playtime_avg, "steamid")

# # Join the game_stats dataframe with the main dataframe
# df = df.join(game_stats, "appid")

# # Calculate the scaling factor based on the ratio of user playtime to overall playtime
# df = df.withColumn("scaling_factor", col("user_playtime_avg") / overall_playtime_avg)

# # Calculate the adjusted cut points
# df = df.withColumn("cut_point_1", when(col("game_mean_playtime") - (col("game_stddev_playtime") * 0.5 * col("scaling_factor")) > 0, col("game_mean_playtime") - (col("game_stddev_playtime") * 0.5 * col("scaling_factor"))).otherwise(0))
# df = df.withColumn("cut_point_2", col("game_mean_playtime") * col("scaling_factor"))
# df = df.withColumn("cut_point_3", col("game_mean_playtime") + (col("game_stddev_playtime") * 0.5 * col("scaling_factor")))
# df = df.withColumn("cut_point_4", col("game_mean_playtime") + col("game_stddev_playtime") * col("scaling_factor"))

# # Assign ratings based on adjusted cut points
# df = df.withColumn(
#     "ratings",
#     when(col("playtime_forever") <= col("cut_point_1"), lit(1))
#     .when((col("playtime_forever") > col("cut_point_1")) & (col("playtime_forever") <= col("cut_point_2")), lit(2))
#     .when((col("playtime_forever") > col("cut_point_2")) & (col("playtime_forever") <= col("cut_point_3")), lit(3))
#     .when((col("playtime_forever") > col("cut_point_3")) & (col("playtime_forever") <= col("cut_point_4")), lit(4))
#     .otherwise(lit(5))
# )

# # Update the user profile calculation to use the new ratings column
# user_aggregated_data = df.groupBy("steamid").agg(
#     collect_list("genre_vector").alias("genres_list"),
#     collect_list("ratings").alias("ratings_list")
# )
# # Show df with new changes(without genres)
# # Drop the genre_vector and genre columns from the DataFrame
# df_without_info = df.drop("genre_vector", "genre", "developer_vector", "Developer", "publisher_vector", "Publisher")

# # Show the DataFrame without the genre_vector and genre columns
# df_without_info.show()
# #df.show()

In [7]:
def normalize_dataset_v2(df):
    # Bucketize the playtime_forever column with intervals 0-30, 30-70, 70-200, 200-500, 500-inf
    return df.withColumn(
        "ratings",
        when(col("playtime_forever") <= 30, lit(1))
        .when((col("playtime_forever") > 30) & (col("playtime_forever") <= 100), lit(2))
        .when((col("playtime_forever") > 100) & (col("playtime_forever") <= 200), lit(3))
        .when((col("playtime_forever") > 200) & (col("playtime_forever") <= 500), lit(4))
        .otherwise(lit(5))
    )

df = normalize_dataset_v2(df)
df.show()

                                                                                

+-----+-----------------+----------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+
|appid|          steamid|playtime_forever|     genre|           Developer|           Publisher|        genre_vector|    developer_vector|    publisher_vector|ratings|
+-----+-----------------+----------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+
|  300|76561197960268000|             109|    Action|               Valve|               Valve|[1, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|      3|
| 1300|76561197960268000|              94|    Action|Ritual Entertainment|Ritual Entertainment|[1, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|      2|
| 2100|76561197960268000|             110|    Action|      Arkane Studios|             Ubisoft|[1, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|      3

In [8]:
# combine vectors
# Define a UDF to combine the vectors
@udf(returnType=ArrayType(IntegerType()))
def combined_vector(genre_vector, developer_vector, publisher_vector):
    return genre_vector + developer_vector + publisher_vector

# Add 'combined_vector' to the DataFrame
df = df.withColumn("combined_vector", combined_vector("genre_vector", "developer_vector", "publisher_vector"))
# split into 80% training and 20% testing set
# train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

## Create user profiles

In [9]:
# Update the user profile calculation to use the new ratings column and include developer and publisher vectors
# Update the user profile calculation to use training data
user_aggregated_data = df.groupBy("steamid").agg(
    collect_list("combined_vector").alias("combined_vectors_list"),  
    collect_list("ratings").alias("ratings_list")
)

# Define a UDF to calculate the weighted average of genre vectors
weighted_avg_features_udf = udf(weighted_avg_features, ArrayType(FloatType()))

# Calculate the user profile as the weighted average of rated item profiles (combined genre, developer, and publisher vectors)
user_profiles = user_aggregated_data.withColumn("user_profile", weighted_avg_features_udf("ratings_list", "combined_vectors_list"))

# user_profiles_train.show()

In [10]:
# Select a steam id and create a list of games that user has already played

In [11]:
random_users = df.select("steamid").distinct().sample(False, 0.5, seed=123).limit(100)
user_profiles_new = user_profiles.join(random_users, on="steamid", how="inner")
# Get played games for the 100 random users
played_games = df.join(random_users, on="steamid", how="inner").select("steamid", "appid", "ratings").distinct()
# user_profiles.show()

In [12]:
random_users.show()

                                                                                

+-----------------+
|          steamid|
+-----------------+
|76561197961433000|
|76561197975983000|
|76561197978981000|
|76561197979519000|
|76561197985399000|
|76561197987630000|
|76561197995902000|
|76561197996265000|
|76561197997814000|
|76561197999271000|
|76561198003755000|
|76561198003831000|
|76561198005088000|
|76561198007777000|
|76561198010476000|
|76561198015878000|
|76561198016529000|
|76561198020739000|
|76561198023676000|
|76561198024409000|
+-----------------+
only showing top 20 rows



## Compute the cosine similarity between user and item profiles

In [13]:
# prediction heuristics
# calculate cosine distance of an item and user profile

# 1. create udf for cosine similarity
cosine_similarity = udf(cosine_similarity_udf, FloatType())

# create dataframe with combined vectors
games_combined_vectors_df = df.select("appid", "combined_vector").distinct()

# cross join the combined vectors with the user_profiles_sample
cross_joined = games_combined_vectors_df.crossJoin(user_profiles_new)

# calculate the cosine similarity between each item and user
recommendations = cross_joined.withColumn(
    "similarity", cosine_similarity("combined_vector", "user_profile")
)

# sort based on similarity score
sorted_recommendations = recommendations.sort(desc("similarity"))
# sorted_recommendations.show()

# show top 10 recommendations for each user in the sample
# sorted_recommendations = sorted_recommendations.groupBy("steamid").agg(collect_list(struct("appid", "similarity")).alias("recommendations"))
# sorted_recommendations = sorted_recommendations.withColumn("top_10_recommendations", slice("recommendations", 1, 10))
# sorted_recommendations.select("steamid", "top_10_recommendations").show(truncate=False)


## Recommend the user's 10 top similar games

In [14]:
# Create a window by steamid and similarity to get ranking
window_spec = Window.partitionBy("steamid").orderBy(desc("similarity"))

ranked_recommendations = sorted_recommendations.withColumn("rank", F.row_number().over(window_spec))

# Filter ranked_recommendations to show only the top 10 for each user
top_10_recommendations = ranked_recommendations.filter(ranked_recommendations.rank <= 10)

# Select one random user from top_10_recommendations
one_random_user = top_10_recommendations.orderBy(rand()).limit(10)
one_random_user_steamid = one_random_user.select("steamid").take(1)[0].steamid

# Filter top_10_recommendations to show only the selected user
top_10_recommendations_specific_user = top_10_recommendations.filter(top_10_recommendations.steamid == one_random_user_steamid)

# Show top 10 recommendations for the specific user
top_10_recommendations_specific_user.show()

                                                                                

+------+--------------------+-----------------+---------------------+--------------------+--------------------+----------+----+
| appid|     combined_vector|          steamid|combined_vectors_list|        ratings_list|        user_profile|similarity|rank|
+------+--------------------+-----------------+---------------------+--------------------+--------------------+----------+----+
|113200|[1, 1, 0, 0, 0, 0...|76561198054989000| [[1, 1, 0, 0, 0, ...|[5, 5, 5, 5, 5, 5...|[1.0, 1.0, 0.0, 0...|0.91287094|   1|
|105600|[1, 1, 0, 0, 0, 0...|76561198054989000| [[1, 1, 0, 0, 0, ...|[5, 5, 5, 5, 5, 5...|[1.0, 1.0, 0.0, 0...|0.91287094|   2|
|263500|[1, 1, 0, 0, 0, 0...|76561198054989000| [[1, 1, 0, 0, 0, ...|[5, 5, 5, 5, 5, 5...|[1.0, 1.0, 0.0, 0...| 0.6761234|   3|
|282100|[1, 1, 0, 1, 0, 0...|76561198054989000| [[1, 1, 0, 0, 0, ...|[5, 5, 5, 5, 5, 5...|[1.0, 1.0, 0.0, 0...| 0.6761234|   4|
|263200|[1, 1, 0, 1, 0, 1...|76561198054989000| [[1, 1, 0, 0, 0, ...|[5, 5, 5, 5, 5, 5...|[1.0, 1.0, 0.0

In [15]:
top_10_recommendations.show()



+------+--------------------+-----------------+---------------------+--------------------+--------------------+----------+----+
| appid|     combined_vector|          steamid|combined_vectors_list|        ratings_list|        user_profile|similarity|rank|
+------+--------------------+-----------------+---------------------+--------------------+--------------------+----------+----+
| 95900|[1, 0, 0, 0, 0, 0...|76561197961433000| [[1, 0, 0, 0, 0, ...|        [5, 5, 5, 5]|[1.0, 0.0, 0.0, 0...|       1.0|   1|
|211600|[1, 0, 0, 0, 0, 0...|76561197961433000| [[1, 0, 0, 0, 0, ...|        [5, 5, 5, 5]|[1.0, 0.0, 0.0, 0...| 0.4472136|   2|
|228400|[1, 0, 0, 0, 0, 0...|76561197961433000| [[1, 0, 0, 0, 0, ...|        [5, 5, 5, 5]|[1.0, 0.0, 0.0, 0...| 0.4472136|   3|
|224900|[1, 0, 0, 0, 0, 0...|76561197961433000| [[1, 0, 0, 0, 0, ...|        [5, 5, 5, 5]|[1.0, 0.0, 0.0, 0...|       0.4|   4|
|289300|[1, 0, 0, 0, 0, 0...|76561197961433000| [[1, 0, 0, 0, 0, ...|        [5, 5, 5, 5]|[1.0, 0.0, 0.0

                                                                                

## Evaluation for steamid top 10 games

In [17]:
# Calculate the top 10 games for each user by rating
top_10_rated_games = played_games \
    .withColumn("rank", F.row_number().over(Window.partitionBy("steamid").orderBy(desc("ratings")))) \
    .filter(F.col("rank") <= 10) \
    .drop("rank") \
    .groupBy("steamid") \
    .agg(F.collect_list("appid").alias("top_rated_appids"))

top_10_recommended_games = top_10_recommendations\
    .withColumn("rank", F.row_number().over(Window.partitionBy("steamid").orderBy(desc("rank")))) \
    .filter(F.col("rank") <= 10) \
    .drop("rank") \
    .groupBy("steamid") \
    .agg(F.collect_list("appid").alias("top_predicted_appids"))

                                                                                

In [24]:
joined_df = top_10_recommended_games.alias("rec") \
    .join(top_10_rated_games.alias("rated"), F.col("rec.steamid") == F.col("rated.steamid")) \
    .filter(F.size("top_rated_appids") >= 10) \
    .withColumn("matches", F.array_intersect("top_predicted_appids", "top_rated_appids")) \
    .withColumn("precision_at_10", F.size("matches") / F.size("top_rated_appids")) \
    .select("rec.steamid", "precision_at_10")

# Compute the mean precision at 10
mean_precision_at_10 = joined_df.agg(F.avg("precision_at_10")).collect()[0][0]

print("Mean precision at 10: {}".format(mean_precision_at_10))



Mean precision at 10: 0.31538461538461543


                                                                                