# Recommendation System for Steam Games

## Data cleaning

### Data Loading

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

# Load CSV files into DataFrames
review = spark.read.csv("gs://msca-bdp-student-gcs/Group5/english_reviews/part-00000-dbebe656-10b1-42c2-93d1-bf55b07e437d-c000.csv", header=True, inferSchema=True)

                                                                                

In [4]:
review.head(2)

[Row(recommendationid=148912575, appid=10, game='Counter-Strike', author_steamid=76561198363716821, author_num_games_owned=0, author_num_reviews=2, author_playtime_forever=197, author_playtime_last_two_weeks=41, author_playtime_at_review=197, author_last_played=1698329401, language='english', review='GOAT Game !!', timestamp_created='1698329419', timestamp_updated='1698329419', voted_up='1', votes_up='0', votes_funny='0', weighted_vote_score='0.0', comment_count='0', steam_purchase='1', received_for_free='0', written_during_early_access='0', hidden_in_steam_china='1', steam_china_location=None),
 Row(recommendationid=148895540, appid=10, game='Counter-Strike', author_steamid=76561198134752176, author_num_games_owned=69, author_num_reviews=1, author_playtime_forever=12107, author_playtime_last_two_weeks=0, author_playtime_at_review=12107, author_last_played=1670487891, language='english', review='First crush. Always in my heart.', timestamp_created='1698306096', timestamp_updated='16983

In [5]:
review.printSchema()

root
 |-- recommendationid: integer (nullable = true)
 |-- appid: integer (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: long (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- voted_up: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable = true)
 |--

### Check missing values in important columns

In [6]:
from pyspark.sql.functions import col

# Check for null values in each column
columns_to_check = ["appid", "author_steamid", "review", "voted_up"]

for column in columns_to_check:
    null_count = review.filter(col(column).isNull()).count()
    print(f"Number of null values in '{column}': {null_count}")

                                                                                

Number of null values in 'appid': 0


                                                                                

Number of null values in 'author_steamid': 0


                                                                                

Number of null values in 'review': 15743




Number of null values in 'voted_up': 9171


                                                                                

In [2]:
# Drop rows with null values in the specified columns
columns_to_check = ["appid", "author_steamid", "review", "voted_up"]
cleaned_review = review.dropna(subset=columns_to_check)

print(f"Number of rows before dropping nulls: {review.count()}")
print(f"Number of rows after dropping nulls: {cleaned_review.count()}")

                                                                                

Number of rows before dropping nulls: 51544179




Number of rows after dropping nulls: 51519266


                                                                                

### Check the review column

In [8]:
# Select the relevant column for sentiment analysis
data = cleaned_review.select("review")
data.show()

+--------------------+
|              review|
+--------------------+
|        GOAT Game !!|
|First crush. Alwa...|
|best version of c...|
|a game everyone n...|
|        old but gold|
|                   y|
|cs2 update made t...|
|     GAY NIGGER GAME|
|Counter-Strike 1....|
|               shoot|
|              GOATED|
|           it's good|
|              tjucdg|
|      best game ever|
|I didn't get to p...|
|                  :)|
|      best game ever|
|           BEST GAME|
|the game of my ch...|
|                njeh|
+--------------------+
only showing top 20 rows



In [3]:
import gcsfs

# Initialize a GCS filesystem
fs = gcsfs.GCSFileSystem()

# Read the file from GCS
with fs.open("gs://msca-bdp-student-gcs/Group5/english_reviews/words.txt", "r") as f:
    english_words = set(word.strip().lower() for word in f)

# Print the number of words loaded (optional)
print(f"Number of English words loaded: {len(english_words)}")

Number of English words loaded: 466545


In [3]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import gcsfs

# Initialize a GCS filesystem
fs = gcsfs.GCSFileSystem()

# Load a list of English words from a file in GCS
with fs.open("gs://msca-bdp-student-gcs/Group5/english_reviews/words.txt", "r") as f:
    english_words = set(word.strip().lower() for word in f)

# Broadcast the English words set
broadcast_english_words = spark.sparkContext.broadcast(english_words)

# UDF to check semantic meaning without langdetect
def has_semantic_meaning(review):
    try:
        # Check if most words are valid English words
        word_list = review.split()
        valid_words = [word for word in word_list if word.lower() in broadcast_english_words.value]
        return len(valid_words) / max(len(word_list), 1) >= 0.5  # At least 50% valid words
    except Exception:
        return False  # Handle errors

# Register the UDF
semantic_udf = udf(has_semantic_meaning, BooleanType())

# Filter reviews with no semantic meaning
nonsensical_reviews = cleaned_review.filter(~semantic_udf(col("review")))

# Count and show nonsensical reviews
nonsensical_count = nonsensical_reviews.count()
print(f"Number of reviews with no semantic meaning: {nonsensical_count}")

# Show examples
nonsensical_reviews.select("review").show(10, truncate=False)

                                                                                

Number of reviews with no semantic meaning: 3771165
+----------------------------------+
|review                            |
+----------------------------------+
|GOATED                            |
|tjucdg                            |
|:)                                |
|njeh                              |
|yeyeee                            |
|++                                |
|(Y)                               |
|kantır                            |
|cs1.6 > cs2                       |
|kaunter straik globalnaja ofensiva|
+----------------------------------+
only showing top 10 rows



In [5]:
# Filter reviews with semantic meaning (inverse of nonsensical)
filtered_reviews = cleaned_review.filter(semantic_udf(col("review")))

# Count the remaining reviews
remaining_count = filtered_reviews.count()
print(f"Number of reviews with semantic meaning: {remaining_count}")

# Display a few examples of meaningful reviews
filtered_reviews.select("review").show(10, truncate=False)

                                                                                

Number of reviews with semantic meaning: 47748101
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

### Check whether some users created multiple reviews after the filtering

In [12]:
from pyspark.sql.functions import col, countDistinct

# Group by author_steamid and count distinct appid
multiple_appid_authors = filtered_reviews.groupBy("author_steamid") \
    .agg(countDistinct("appid").alias("distinct_appids")) \
    .filter(col("distinct_appids") > 1)

# Show the results
multiple_appid_authors.show()

# Count the number of such authors
num_authors = multiple_appid_authors.count()
print(f"Number of authors with reviews for multiple appids: {num_authors}")

                                                                                

+-----------------+---------------+
|   author_steamid|distinct_appids|
+-----------------+---------------+
|76561198890674540|              5|
|76561198045701284|              6|
|76561198799410132|              5|
|76561198331817782|              2|
|76561198152726200|              9|
|76561199112667458|              6|
|76561198192842693|              6|
|76561198121156197|              3|
|76561198393902835|              2|
|76561198165441867|              8|
|76561198074386816|              3|
|76561198288350718|              2|
|76561198101265793|             10|
|76561198047937382|              3|
|76561198114376048|             13|
|76561199076437074|              3|
|76561198339278681|              4|
|76561199105996769|             11|
|76561199140222789|              3|
|76561199106887592|             12|
+-----------------+---------------+
only showing top 20 rows





Number of authors with reviews for multiple appids: 7023825


                                                                                

### Text Preprocessing

In [6]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import re

# Define a custom list of stop words (you can expand this list as needed)
STOP_WORDS = {"a", "an", "the", "and", "or", "not", "to", "is", "it", "in", "for", "on", "of", "this", "that", "with", "was", "are"}

# Define the text preprocessing function
def preprocess_text_no_nltk(text):
    if text is None:  # Handle null values
        return ""
    # Lowercase text
    text = text.lower()
    # Remove special characters and digits
    text = re.sub(r'[^a-z\s]', '', text)
    # Tokenize (split by spaces)
    tokens = text.split()
    # Remove stop words
    tokens = [word for word in tokens if word not in STOP_WORDS]
    return ' '.join(tokens)

# Register the UDF
preprocess_text_udf = udf(preprocess_text_no_nltk, StringType())

# Apply preprocessing to the review column
filtered_reviews = filtered_reviews.withColumn("cleaned_review", preprocess_text_udf(col("review")))
filtered_reviews.head(2)

                                                                                

[Row(recommendationid=148912575, appid=10, game='Counter-Strike', author_steamid=76561198363716821, author_num_games_owned=0, author_num_reviews=2, author_playtime_forever=197, author_playtime_last_two_weeks=41, author_playtime_at_review=197, author_last_played=1698329401, language='english', review='GOAT Game !!', timestamp_created='1698329419', timestamp_updated='1698329419', voted_up='1', votes_up='0', votes_funny='0', weighted_vote_score='0.0', comment_count='0', steam_purchase='1', received_for_free='0', written_during_early_access='0', hidden_in_steam_china='1', steam_china_location=None, cleaned_review='goat game'),
 Row(recommendationid=148895540, appid=10, game='Counter-Strike', author_steamid=76561198134752176, author_num_games_owned=69, author_num_reviews=1, author_playtime_forever=12107, author_playtime_last_two_weeks=0, author_playtime_at_review=12107, author_last_played=1670487891, language='english', review='First crush. Always in my heart.', timestamp_created='169830609

In [None]:
#filtered_reviews.coalesce(1).write.csv(
#    "gs://msca-bdp-student-gcs/Group5/filtered_reviews",
#    header=True,
#    mode="overwrite"
#)
#
#print("Export successful!")

24/11/16 23:28:41 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem: Successfully repaired 'gs://msca-bdp-student-gcs/Group5/filtered_reviews/' directory.


Export successful!


## Sentiment Score Model

### Data Processing

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("GameModel").getOrCreate()

# Load CSV files into DataFrames
filtered_reviews = spark.read.csv("gs://msca-bdp-student-gcs/Group5/filtered_reviews/part-00000-699f208e-6876-43bf-aa8c-315adc363e63-c000.csv", header=True, inferSchema=True)

                                                                                

In [27]:
from pyspark.sql.functions import when, col

# Filter reviews to exclude rows where either 'cleaned_review' or 'voted_up' is null or empty
reviews = filtered_reviews.filter(
    ~(col("cleaned_review").isNull() | (col("cleaned_review") == "") |
      col("voted_up").isNull() | (col("voted_up") == ""))
)

# Convert 'voted_up' to numeric using 'label'
reviews = reviews.withColumn(
    'label',
    when(col('voted_up') == '1', 1.0).when(col('voted_up') == '0', 0.0)
)

# Drop the 'voted_up' column
reviews = reviews.drop('voted_up')

# Show the updated DataFrame
reviews.show(2)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+--------------------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+--------------------+-----+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|              review|timestamp_created|timestamp_updated|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|      cleaned_review|label|
+----------------+-----+--------------+-----------------+----------------------+----------------

In [28]:
reviews.head(2)

[Row(recommendationid=148912575, appid=10, game='Counter-Strike', author_steamid=76561198363716821, author_num_games_owned=0, author_num_reviews=2, author_playtime_forever=197, author_playtime_last_two_weeks=41, author_playtime_at_review=197, author_last_played=1698329401, language='english', review='GOAT Game !!', timestamp_created='1698329419', timestamp_updated='1698329419', votes_up='0', votes_funny='0', weighted_vote_score='0.0', comment_count='0', steam_purchase='1', received_for_free='0', written_during_early_access='0', hidden_in_steam_china='1', steam_china_location=None, cleaned_review='goat game', label=1.0),
 Row(recommendationid=148895540, appid=10, game='Counter-Strike', author_steamid=76561198134752176, author_num_games_owned=69, author_num_reviews=1, author_playtime_forever=12107, author_playtime_last_two_weeks=0, author_playtime_at_review=12107, author_last_played=1670487891, language='english', review='First crush. Always in my heart.', timestamp_created='1698306096',

In [29]:
reviews.printSchema()

root
 |-- recommendationid: integer (nullable = true)
 |-- appid: integer (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: long (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable = true)
 |-- written_during_early_access: string (nu

In [11]:
# Count rows where label = 1
label_1_count = reviews.filter(col('label') == 1).count()

# Count rows where label = 0
label_0_count = reviews.filter(col('label') == 0).count()

# Print the counts
print(f"Count of label = 1: {label_1_count}")
print(f"Count of label = 0: {label_0_count}")



Count of label = 1: 39668223
Count of label = 0: 6330285


                                                                                

### Feature Extraction

In [2]:
from pyspark.sql.functions import when, col

# Filter reviews to exclude rows where either 'cleaned_review' or 'voted_up' is null or empty
reviews = filtered_reviews.filter(
    ~(col("cleaned_review").isNull() | (col("cleaned_review") == "") |
      col("voted_up").isNull() | (col("voted_up") == ""))
)

# Convert 'voted_up' to numeric using 'label'
reviews = reviews.withColumn(
    'label',
    when(col('voted_up') == '1', 1.0).when(col('voted_up') == '0', 0.0)
)

# Drop the 'voted_up' column
reviews = reviews.drop('voted_up')

# Show the updated DataFrame
reviews.show(2)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+--------------------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+--------------------+-----+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|              review|timestamp_created|timestamp_updated|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|      cleaned_review|label|
+----------------+-----+--------------+-----------------+----------------------+----------------

In [3]:
#spark ML imports
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [4]:
#tokenize words
tokenizer = Tokenizer(inputCol="cleaned_review", outputCol="words")
reviews = tokenizer.transform(reviews)
reviews.show(2)

24/11/17 02:40:54 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+--------------------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+--------------------+-----+--------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|              review|timestamp_created|timestamp_updated|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|      cleaned_review|label|               words|
+----------------+-----+--------------+---------------

                                                                                

In [5]:
#remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
reviews = remover.transform(reviews)

#drop the redundant source column
reviews = reviews.drop("words")
reviews.show(2,truncate = False)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+--------------------------------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+---------------------------+-----+-----------------------------+
|recommendationid|appid|game          |author_steamid   |author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|review                          |timestamp_created|timestamp_updated|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|cleaned_review             |label|filtered                     

                                                                                

In [6]:
#Maps a sequence of terms to their term frequencies using the hashing trick. 
#alternatively, CountVectorizer can also be used to get term frequency vectors
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(reviews)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
nlpdf = idfModel.transform(featurizedData)
nlpdf.select("label", "features").show(5, truncate=False)

                                                                                

+-----+----------------------------------------------------------------------------------------------+
|label|features                                                                                      |
+-----+----------------------------------------------------------------------------------------------+
|1.0  |(20,[15,17],[0.4313029839490665,0.9237928288927962])                                          |
|1.0  |(20,[2,3,8,11],[1.1493830820009927,0.8487405682175299,0.8561055790511669,1.0905517832019862]) |
|1.0  |(20,[3,8,10,14],[1.6974811364350597,0.8561055790511669,0.7989596810049043,1.0251466204901554])|
|1.0  |(20,[13,15,18],[1.7110766778744637,0.4313029839490665,1.099606599099233])                     |
|1.0  |(20,[3,13],[0.8487405682175299,0.8555383389372319])                                           |
+-----+----------------------------------------------------------------------------------------------+
only showing top 5 rows



In [7]:
nlpdf.show(2)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+--------------------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|              review|timestamp_created|timestamp_updated|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|      cleaned_review|label|            filtered|         rawF

In [8]:
null_rows = nlpdf.filter(nlpdf["label"].isNull() | nlpdf["features"].isNull())
null_rows.show(2)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+--------------------+--------------------+--------------------+-----------------+--------------------+-------------------+-----------------+--------------+--------------------+---------------------------+---------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|              review|   timestamp_created|   timestamp_updated|         votes_up|         votes_funny|weighted_vote_score|    comment_count|steam_purchase|   received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location

In [9]:
nlpdf = nlpdf.dropna(subset=["label", "features"])

In [11]:
#split data into train and test
train_df, test_df = nlpdf.randomSplit([0.8, 0.2], seed=42)

train_df.show(1)

[Stage 10:>                                                         (0 + 1) / 1]

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+--------------------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|              review|timestamp_created|timestamp_updated|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|      cleaned_review|label|            filtered|         rawF

                                                                                

### Logistic Regression Model

In [12]:
from pyspark.ml.classification import LogisticRegression

# Set parameters for Logistic Regression
lgr = LogisticRegression(maxIter=10, featuresCol = 'features', labelCol='label')

# Fit the model to the data.
lgrm = lgr.fit(train_df)

24/11/17 03:05:19 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/11/17 03:05:19 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [14]:
# Given a dataset, predict each point's label, and show the results.
predictions = lgrm.transform(test_df)

In [None]:
#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

                                                                                

0.8608528658057427




0.8004072202077143


                                                                                

In [17]:
predictions.head(2)

                                                                                

[Row(recommendationid=1139, appid=10, game='Counter-Strike', author_steamid=76561197960270784, author_num_games_owned=219, author_num_reviews=8, author_playtime_forever=4056, author_playtime_last_two_weeks=0, author_playtime_at_review=111, author_last_played=1683984633, language='english', review="Get out of there it's gonna blow!!!", timestamp_created='1310238451', timestamp_updated='1669170352', votes_up='0', votes_funny='0', weighted_vote_score='0.0', comment_count='0', steam_purchase='0', received_for_free='0', written_during_early_access='0', hidden_in_steam_china='0', steam_china_location=None, cleaned_review='get out there its gonna blow', label=1.0, filtered=['get', 'gonna', 'blow'], rawFeatures=SparseVector(20, {6: 1.0, 8: 1.0, 17: 1.0}), features=SparseVector(20, {6: 0.9777, 8: 0.8561, 17: 0.9238}), rawPrediction=DenseVector([-1.9054, 1.9054]), probability=DenseVector([0.1295, 0.8705]), prediction=1.0),
 Row(recommendationid=3362, appid=10, game='Counter-Strike', author_steam

### Generate Sentiment Scores

In [19]:
from pyspark.sql.functions import col

# Combine train and test data
all_data = train_df.union(test_df)

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType

# Define a UDF to extract the probability for label=1
extract_probability_udf = udf(lambda probability: float(probability.values[1]), DoubleType())

# Add sentiment scores
predictions_with_scores_all = predictions_all.withColumn("sentiment_score", extract_probability_udf(col("probability")))

# Select relevant columns (appid, author_steamid, sentiment_score)
scores_to_add = predictions_with_scores_all.select("appid", "author_steamid", "sentiment_score")

# Join the sentiment scores back to the original reviews DataFrame
reviews_w_scores = reviews.join(scores_to_add, on=["appid", "author_steamid"], how="left")

# Display the updated DataFrame with sentiment scores
reviews_w_scores.show(10, truncate=False)

[Stage 48:>                                                         (0 + 1) / 1]

+-----+-----------------+----------------+--------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+------------------------------------------------------------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------------+------------------+
|appid|author_steamid   |recommendationid|game          |author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|aut

                                                                                

In [24]:
from pyspark.sql.functions import col

# Drop rows with any null values in the "sentiment_score" column
filtered_reviews_w_scores = reviews_w_scores.filter(col("sentiment_score").isNotNull())

# Drop the 'filtered' column if it exists
final_reviews_w_scores = filtered_reviews_w_scores.drop("filtered")

In [None]:
# Save the filtered and cleaned DataFrame to GCS as a single CSV file
#final_reviews_w_scores.coalesce(1).write.csv(
#    "gs://msca-bdp-student-gcs/Group5/reviews_w_scores",
#    header=True,
#    mode="overwrite"
#)
#
#print("Export successful!")

                                                                                

Export successful!


## Recommendation System - Collaborate Filtering Model

### Data Processing

In [3]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("GameRemmendation").getOrCreate()

# Load CSV files into DataFrames
reviews_w_score = spark.read.csv("gs://msca-bdp-student-gcs/Group5/reviews_w_scores/part-00000-c23f0257-0f2c-49b8-abe8-30ebfebd681c-c000.csv", header=True, inferSchema=True)

                                                                                

In [4]:
reviews_w_score.head(2)

[Row(appid=10, author_steamid=76561197960270613, recommendationid=23955932, game='Counter-Strike', author_num_games_owned=0, author_num_reviews=263, author_playtime_forever=4917, author_playtime_last_two_weeks=0, author_playtime_at_review=4841, author_last_played=1695867513, language='english', review='I have really tried to like CS:GO but its just not for me I feel this and CS:Source are A+ games', timestamp_created='1467296140', timestamp_updated='1504271092', votes_up='0', votes_funny='0', weighted_vote_score='0.0', comment_count='0', steam_purchase='0', received_for_free='0', written_during_early_access='0', hidden_in_steam_china='0', steam_china_location=None, cleaned_review='i have really tried like csgo but its just me i feel cssource games', label=1.0, sentiment_score=0.8847313172665581),
 Row(appid=10, author_steamid=76561197960281796, recommendationid=49490696, game='Counter-Strike', author_num_games_owned=30, author_num_reviews=1, author_playtime_forever=559666, author_playt

In [5]:
reviews_w_score.printSchema() 

root
 |-- appid: integer (nullable = true)
 |-- author_steamid: long (nullable = true)
 |-- recommendationid: integer (nullable = true)
 |-- game: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable = true)
 |-- written_during_early_access: string (nu

In [9]:
# Load CSV files into DataFrames
game_info = spark.read.csv("gs://msca-bdp-student-gcs/Group5/game_w_label/part-00000-f8cd2505-cd1c-4845-9cd5-04dfcaaebc9e-c000.csv", header=True, inferSchema=True)
game_info.head(1)

                                                                                

[Row(appid=10, name='Counter-Strike', developer='Valve', publisher='Valve', positive=235682, negative=6218, owners='10,000,000 .. 20,000,000', average_forever=8920, average_2weeks=7, median_forever=174, median_2weeks=10, price=999, initialprice=999, discount=0, ccu=14002, Afrikaans=0, Arabic=0, Armenian=0, Basque=0, Belarusian=0, Bulgarian=0, Catalan=0, Croatian=0, Czech=0, Danish=0, Dutch=0, English=1, Estonian=0, Filipino=0, Finnish=0, French=1, Galician=0, Georgian=0, German=1, Greek=0, Hebrew=0, Hindi=0, Hungarian=0, Icelandic=0, Indonesian=0, Irish=0, Italian=1, Japanese=0, Kannada=0, Kazakh=0, Korean=1, Latvian=0, Lithuanian=0, Malay=0, Maori=0, Mongolian=0, Norwegian=0, Not supported=0, Persian=0, Polish=0, Portuguese=0, Portuguese - Brazil=0, Portuguese - Portugal=0, Romanian=0, Russian=0, Serbian=0, Simplified Chinese=1, Slovak=0, Slovenian=0, Spanish=0, Spanish - Latin America=0, Spanish - Spain=1, Swedish=0, Tamil=0, Telugu=0, Thai=0, Traditional Chinese=1, Turkish=0, Ukrain

In [10]:
game_info.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- positive: integer (nullable = true)
 |-- negative: integer (nullable = true)
 |-- owners: string (nullable = true)
 |-- average_forever: integer (nullable = true)
 |-- average_2weeks: integer (nullable = true)
 |-- median_forever: integer (nullable = true)
 |-- median_2weeks: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- initialprice: integer (nullable = true)
 |-- discount: integer (nullable = true)
 |-- ccu: integer (nullable = true)
 |-- Afrikaans: integer (nullable = true)
 |-- Arabic: integer (nullable = true)
 |-- Armenian: integer (nullable = true)
 |-- Basque: integer (nullable = true)
 |-- Belarusian: integer (nullable = true)
 |-- Bulgarian: integer (nullable = true)
 |-- Catalan: integer (nullable = true)
 |-- Croatian: integer (nullable = true)
 |-- Czech: integer (nullable = true)
 |-- D

In [11]:
from pyspark.sql.functions import col

# Select only the required columns
reviews_score = reviews_w_score.select(
    col("appid"),
    col("author_steamid").cast("integer").alias("author_steamid"),
    col("sentiment_score")
)

# Drop rows with null values in any of the selected columns
reviews_score = reviews_score.dropna()

# Show the first 10 rows to confirm
reviews_score.show(10, truncate=False)

+-----+--------------+------------------+
|appid|author_steamid|sentiment_score   |
+-----+--------------+------------------+
|10   |4885          |0.8847313172665581|
|10   |16068         |0.8731704227545498|
|10   |73141         |0.9042052339242086|
|10   |125096        |0.8807123180475541|
|10   |202012        |0.8555295814817651|
|10   |236498        |0.8737499209301924|
|10   |279059        |0.866265366391876 |
|10   |884484        |0.8593730529478136|
|10   |886865        |0.8742503363379408|
|10   |889070        |0.8730743843530598|
+-----+--------------+------------------+
only showing top 10 rows



In [12]:
from pyspark.sql.functions import expr, col

# List of genre columns to combine into one column
genre_columns = [
    "genre_Action", "genre_Action Games", "genre_Adventure", "genre_Animation & Modeling", 
    "genre_Audio Production", "genre_Casual", "genre_Design & Illustration", 
    "genre_Early Access", "genre_Education", "genre_Free To Play84", "genre_Free to Play85",
    "genre_Game Development", "genre_Indie", "genre_Massively Multiplayer", 
    "genre_Photo Editing", "genre_RPG", "genre_Racing", "genre_Simulation", 
    "genre_Software Training", "genre_Sports", "genre_Strategy", "genre_Utilities", 
    "genre_Video Production", "genre_Web Publishing"
]

# Wrap column names with backticks (`) to handle special characters and spaces
game_info_with_genre = game_info.withColumn(
    "genre",
    expr("CASE " + " ".join([f"WHEN `{col}` = 1 THEN '{col}'" for col in genre_columns]) + " ELSE 'Unknown' END")
)

# Remove the 'genre_' prefix from the combined genre column
game_info_with_genre = game_info_with_genre.withColumn(
    "genre", expr("replace(genre, 'genre_', '')")
)

# Select only the required columns
games = game_info_with_genre.select(
    "appid", "name", "genre", "developer", "publisher", "price", "label"
)

# Show a few rows to confirm
games.show(10, truncate=False)

+-------+-------------------------------+---------+-----------------------------------+-----------------------------+-----+-----------------------+
|appid  |name                           |genre    |developer                          |publisher                    |price|label                  |
+-------+-------------------------------+---------+-----------------------------------+-----------------------------+-----+-----------------------+
|10     |Counter-Strike                 |Action   |Valve                              |Valve                        |999  |Overwhelmingly Positive|
|1000010|Crown Trick                    |Adventure|NEXT Studios                       |Team17, NEXT Studios         |1999 |Very Positive          |
|1000030|Cook, Serve, Delicious! 3?!    |Action   |Vertigo Gaming Inc.                |Vertigo Gaming Inc.          |1999 |Very Positive          |
|1000080|Zengeon                        |Action   |IndieLeague Studio                 |2P Games                 

In [18]:
# Perform the left join
reviews_info = reviews_score.join(games, reviews_score.appid == games.appid, how="inner")

# Drop duplicate 'appid' column from the 'games' DataFrame
reviews_info = reviews_info.drop(games.appid)

# Show the results to verify
reviews_info.show(5, truncate=False)

                                                                                

+-----+--------------+------------------+--------------+------+---------+---------+-----+-----------------------+
|appid|author_steamid|sentiment_score   |name          |genre |developer|publisher|price|label                  |
+-----+--------------+------------------+--------------+------+---------+---------+-----+-----------------------+
|10   |4885          |0.8847313172665581|Counter-Strike|Action|Valve    |Valve    |999  |Overwhelmingly Positive|
|10   |16068         |0.8731704227545498|Counter-Strike|Action|Valve    |Valve    |999  |Overwhelmingly Positive|
|10   |73141         |0.9042052339242086|Counter-Strike|Action|Valve    |Valve    |999  |Overwhelmingly Positive|
|10   |125096        |0.8807123180475541|Counter-Strike|Action|Valve    |Valve    |999  |Overwhelmingly Positive|
|10   |202012        |0.8555295814817651|Counter-Strike|Action|Valve    |Valve    |999  |Overwhelmingly Positive|
+-----+--------------+------------------+--------------+------+---------+---------+-----

[Stage 17:>                                                         (0 + 1) / 1]

In [19]:
reviews_info.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- author_steamid: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = false)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- label: string (nullable = true)



In [20]:
from pyspark.sql.functions import col

# Divide the 'price' column by 100 to convert from integer to float
reviews_info = reviews_info.withColumn("price", col("price") / 100.0)

# Drop rows with null values
reviews_info = reviews_info.dropna()

# Show the updated DataFrame
reviews_info.show(5, truncate=False)

                                                                                

+-----+--------------+------------------+--------------+------+---------+---------+-----+-----------------------+
|appid|author_steamid|sentiment_score   |name          |genre |developer|publisher|price|label                  |
+-----+--------------+------------------+--------------+------+---------+---------+-----+-----------------------+
|10   |4885          |0.8847313172665581|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |16068         |0.8731704227545498|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |73141         |0.9042052339242086|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |125096        |0.8807123180475541|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |202012        |0.8555295814817651|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
+-----+--------------+------------------+--------------+------+---------+---------+-----

[Stage 17:>                                                         (0 + 1) / 1]

In [21]:
reviews_info.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- author_steamid: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = false)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- price: double (nullable = true)
 |-- label: string (nullable = true)



In [23]:
reviews_info.describe().show()

[Stage 17:>                 (0 + 1) / 1][Stage 25:>                 (0 + 1) / 1]

+-------+-----------------+--------------------+------------------+----------------------+--------------+----------------------+----------------------+------------------+-------------+
|summary|            appid|      author_steamid|   sentiment_score|                  name|         genre|             developer|             publisher|             price|        label|
+-------+-----------------+--------------------+------------------+----------------------+--------------+----------------------+----------------------+------------------+-------------+
|  count|         39884443|            39884443|          39884443|              39884443|      39884443|              39884443|              39884443|          39884443|     39884443|
|   mean|559494.8683177298|3.8599857446956426E8|0.8627315401701823|     543.2030242935052|          null|     687.9829605963791|     3858.085599789778|17.407208234121047|         null|
| stddev|481288.1428677972|4.1326023906429297E8|0.0522987591586619|     757

[Stage 17:>                                                         (0 + 1) / 1]

In [24]:
# Save the filtered and cleaned DataFrame to GCS as a single CSV file
#reviews_info.coalesce(1).write.csv(
#    "gs://msca-bdp-student-gcs/Group5/reviews_w_info",
#    header=True,
#    mode="overwrite"
#)

#print("Export successful!")

                                                                                

Export successful!


### Model Building

In [26]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("GameRemmendation").getOrCreate()

# Load CSV files into DataFrames
reviews_info = spark.read.csv("gs://msca-bdp-student-gcs/Group5/reviews_w_info/part-00000-4b308751-c8b9-44e1-8062-649a7203de14-c000.csv", header=True, inferSchema=True)
reviews_info.head(1)

                                                                                

[Row(appid=10, author_steamid=4885, sentiment_score=0.8847313172665581, name='Counter-Strike', genre='Action', developer='Valve', publisher='Valve', price=9.99, label='Overwhelmingly Positive')]

In [29]:
# Show the updated DataFrame
reviews_info.show(5, truncate=False)

+-----+--------------+------------------+--------------+------+---------+---------+-----+-----------------------+
|appid|author_steamid|sentiment_score   |name          |genre |developer|publisher|price|label                  |
+-----+--------------+------------------+--------------+------+---------+---------+-----+-----------------------+
|10   |4885          |0.8847313172665581|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |16068         |0.8731704227545498|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |73141         |0.9042052339242086|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |125096        |0.8807123180475541|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |202012        |0.8555295814817651|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
+-----+--------------+------------------+--------------+------+---------+---------+-----

In [27]:
reviews_info.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- author_steamid: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- price: double (nullable = true)
 |-- label: string (nullable = true)



In [None]:
reviews_info.describe().show()



+-------+-----------------+-------------------+-------------------+----------------------+--------------+----------------------+----------------------+------------------+-------------+
|summary|            appid|     author_steamid|    sentiment_score|                  name|         genre|             developer|             publisher|             price|        label|
+-------+-----------------+-------------------+-------------------+----------------------+--------------+----------------------+----------------------+------------------+-------------+
|  count|         39884443|           39884443|           39884443|              39884443|      39884443|              39884443|              39884443|          39884443|     39884443|
|   mean|559494.8683177298|3.859985744695644E8| 0.8627315401709978|     543.2030242935052|          null|     687.9829605963791|     3858.085599789778|17.407208233985035|         null|
| stddev|481288.1428677959|4.132602390642919E8|0.05229875915866183|     757

                                                                                

In [30]:
from pyspark.sql.functions import col

# Select only the required columns
reviews_cf = reviews_info.select("appid", "author_steamid", "sentiment_score")

# Drop rows with null values in any of the selected columns
reviews_cf = reviews_cf.dropna()

# Show the first 10 rows to confirm
reviews_cf.show(10, truncate=False)

+-----+--------------+------------------+
|appid|author_steamid|sentiment_score   |
+-----+--------------+------------------+
|10   |4885          |0.8847313172665581|
|10   |16068         |0.8731704227545498|
|10   |73141         |0.9042052339242086|
|10   |125096        |0.8807123180475541|
|10   |202012        |0.8555295814817651|
|10   |236498        |0.8737499209301924|
|10   |279059        |0.866265366391876 |
|10   |884484        |0.8593730529478136|
|10   |886865        |0.8742503363379408|
|10   |889070        |0.8730743843530598|
+-----+--------------+------------------+
only showing top 10 rows



In [31]:
reviews_cf.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- author_steamid: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)



In [32]:
reviews_cf.describe().show()



+-------+------------------+--------------------+-------------------+
|summary|             appid|      author_steamid|    sentiment_score|
+-------+------------------+--------------------+-------------------+
|  count|          39884443|            39884443|           39884443|
|   mean| 559494.8683177298|3.8599857446956444E8| 0.8627315401709981|
| stddev|481288.14286779595| 4.132602390642917E8|0.05229875915866183|
|    min|                10|                   2|                0.0|
|    max|           2618840|          1608152432|                1.0|
+-------+------------------+--------------------+-------------------+



                                                                                

In [33]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# Split the dataset into training and testing sets
training, test = reviews_cf.randomSplit([0.8, 0.2])

In [34]:
# Build the ALS model
als = ALS(
    maxIter=10,
    regParam=0.01,
    rank=10,
    userCol="author_steamid",
    itemCol="appid",
    ratingCol="sentiment_score",
    coldStartStrategy="drop",
    nonnegative=True
)

# Fit the ALS model on the training data
model = als.fit(training)

                                                                                

In [35]:
# Generate predictions on the test data
predictions = model.transform(test)

In [36]:
#explain parameters of the model
model.explainParams()

"blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 4096)\ncoldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)\nitemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: appid)\npredictionCol: prediction column name. (default: prediction)\nuserCol: column name for user ids. Ids must be within the integer value range. (default: user, current: author_steamid)"

In [37]:
#item factors 
model.itemFactors.show(10, truncate = False)

+---+--------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                  |
+---+--------------------------------------------------------------------------------------------------------------------------+
|10 |[0.54195374, 0.13589714, 7.7461294E-4, 0.29960793, 0.0032619457, 0.27553084, 0.34697375, 5.268064E-4, 0.0, 0.011878433]   |
|20 |[0.31986937, 0.27253136, 0.0784746, 0.16628264, 0.009966973, 0.255988, 0.14212343, 0.23479694, 0.16091265, 0.008302208]   |
|30 |[0.3144448, 0.27869168, 0.096934624, 0.19748399, 0.058678612, 0.21371606, 0.13882087, 0.2386709, 0.14194591, 0.03952991]  |
|40 |[0.33535728, 0.25705478, 0.022033771, 0.1582301, 0.049704958, 0.24369459, 0.13425945, 0.2335789, 0.18554185, 0.03205029]  |
|50 |[0.2827807, 0.31245115, 0.08568611, 0.18937051, 0.025409183, 0.18928935, 0.19448572, 0.20746

In [38]:
# Evaluate the model using RMSE
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="sentiment_score",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Display some predictions for inspection
predictions.select("author_steamid", "appid", "sentiment_score", "prediction").show(10, truncate=False)

                                                                                

Root Mean Squared Error (RMSE): 0.13005772918622382




+--------------+-------+------------------+----------+
|author_steamid|appid  |sentiment_score   |prediction|
+--------------+-------+------------------+----------+
|53            |493340 |0.7068436432937024|0.4807135 |
|53            |223850 |0.7072537158776404|0.4672202 |
|497           |1086940|0.8833215936956873|0.8794167 |
|497           |440    |0.87946853988706  |0.6473543 |
|497           |224760 |0.8120242710560317|0.76103824|
|2079          |578080 |0.8752413696936361|0.6893583 |
|3306          |730    |0.8709374734343704|0.86499023|
|3306          |671860 |0.874807910863581 |0.78267103|
|3436          |363970 |0.840316941956014 |0.7928872 |
|3485          |1556870|0.825365543955509 |0.78041875|
+--------------+-------+------------------+----------+
only showing top 10 rows



                                                                                

The RMSE of 0.13 suggests the model's predictions are generally within ±0.13 of the actual sentiment scores, on average.

In [39]:
predictions.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- author_steamid: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- prediction: float (nullable = false)



In [40]:
reviews_info.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- author_steamid: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- price: double (nullable = true)
 |-- label: string (nullable = true)



In [47]:
games_info = reviews_info.drop('author_steamid', 'sentiment_score')
games_info.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- price: double (nullable = true)
 |-- label: string (nullable = true)



In [48]:
from pyspark.sql.functions import col

# Ensure 'games_info' has unique appid
games_info_unique = games_info.dropDuplicates(["appid"])

# Perform the left join between recommendation and games_info on 'appid'
recommendation = predictions.join(games_info_unique, "appid", "left")

# Drop duplicate 'appid' column
# recommendation = recommendation.drop(games_info.appid)

# Remove duplicate rows from the result
recommendation = recommendation.dropDuplicates()

# Show the results to verify
recommendation.show(5, truncate=False)

[Stage 727:>                                                        (0 + 1) / 1]

+-----+--------------+------------------+----------+-----------+-----+--------------+--------------+-----+-------------+
|appid|author_steamid|sentiment_score   |prediction|name       |genre|developer     |publisher     |price|label        |
+-----+--------------+------------------+----------+-----------+-----+--------------+--------------+-----+-------------+
|1700 |4398690       |0.8295306347677494|0.7715237 |Arx Fatalis|RPG  |Arkane Studios|Arkane Studios|4.99 |Very Positive|
|1700 |41696944      |0.7844690368846943|0.8328918 |Arx Fatalis|RPG  |Arkane Studios|Arkane Studios|4.99 |Very Positive|
|1700 |44089998      |0.7998424761455951|0.7946886 |Arx Fatalis|RPG  |Arkane Studios|Arkane Studios|4.99 |Very Positive|
|1700 |88082942      |0.7087531905029538|0.76503265|Arx Fatalis|RPG  |Arkane Studios|Arkane Studios|4.99 |Very Positive|
|1700 |107062795     |0.7910992547491337|0.7969639 |Arx Fatalis|RPG  |Arkane Studios|Arkane Studios|4.99 |Very Positive|
+-----+--------------+----------

                                                                                

In [49]:
recommendation.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- author_steamid: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- prediction: float (nullable = false)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- price: double (nullable = true)
 |-- label: string (nullable = true)



In [50]:
recommendation.describe().show()



+-------+------------------+--------------------+-------------------+-------------------+----------------------+--------------+----------------------+----------------------+------------------+-------------+
|summary|             appid|      author_steamid|    sentiment_score|         prediction|                  name|         genre|             developer|             publisher|             price|        label|
+-------+------------------+--------------------+-------------------+-------------------+----------------------+--------------+----------------------+----------------------+------------------+-------------+
|  count|           6280272|             6280272|            6280272|            6280272|               6280272|       6280272|               6280272|               6280272|           6280272|      6280272|
|   mean| 569388.7875490106|3.5155200036015224E8| 0.8613844942969497| 0.7893914995778923|     545.3015427769986|          null|                691.44|    3859.6374284226963

                                                                                

### Prediction - Application

In [52]:
# Specify the user for whom you want to generate recommendations
specific_user = 4398690  # Replace with the actual user ID

# Create a DataFrame for the specific user
specific_user_df = recommendation.filter(recommendation.author_steamid == specific_user).select("author_steamid").distinct()

# Generate recommendations for the specific user
userRecs = model.recommendForUserSubset(specific_user_df, 5)

# Show the recommendations
userRecs.show(truncate=False)

                                                                                

+--------------+------------------------------------------------------------------------------------------------------------+
|author_steamid|recommendations                                                                                             |
+--------------+------------------------------------------------------------------------------------------------------------+
|4398690       |[{2129020, 1.6654224}, {2251240, 1.6357807}, {2079950, 1.6010736}, {1945230, 1.5943185}, {836640, 1.473071}]|
+--------------+------------------------------------------------------------------------------------------------------------+



In [53]:
# Drop the last column of the DataFrame
#userRecs = userRecs.drop(userRecs.columns[-1])

# Show the result to confirm
userRecs.printSchema()
games.printSchema()

root
 |-- author_steamid: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

root
 |-- appid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = false)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- label: string (nullable = true)



In [55]:
userRecs.head(1)

                                                                                

[Row(author_steamid=4398690, recommendations=[Row(appid=2129020, rating=1.6654224395751953), Row(appid=2251240, rating=1.635780692100525), Row(appid=2079950, rating=1.6010736227035522), Row(appid=1945230, rating=1.5943185091018677), Row(appid=836640, rating=1.4730709791183472)])]

In [58]:
from pyspark.sql.functions import col, explode

# Explode the recommendations into separate rows
exploded_user_recs = userRecs.withColumn("recommendation", explode(col("recommendations")))

# Extract appid and rating from recommendations
exploded_user_recs = exploded_user_recs.select(
    col("author_steamid"),
    col("recommendation.appid").alias("appid"),
    col("recommendation.rating").alias("rating")
)

# Join with the games DataFrame to enrich the recommendations
enriched_recs = exploded_user_recs.join(games_info_unique, "appid", "left")

# Filter to include only the specific user
user_4398690_recs = enriched_recs.filter(col("author_steamid") == specific_user)

# Show the enriched recommendations for the specific user
user_4398690_recs.show(truncate=False)

                                                                                

+-------+--------------+---------+---------------------------------------------+---------+------------------------------+------------------------------+-----+-------------+
|appid  |author_steamid|rating   |name                                         |genre    |developer                     |publisher                     |price|label        |
+-------+--------------+---------+---------------------------------------------+---------+------------------------------+------------------------------+-----+-------------+
|2129020|4398690       |1.6654224|Last Christmas                               |Adventure|polytely games                |polytely games                |4.99 |Positive     |
|2251240|4398690       |1.6357807|Police Car Simulator                         |Action   |Yusuf Islam Seyhan, Samet Acar|Inspector Studios             |0.99 |Negative     |
|1945230|4398690       |1.5943185|Diablo The Abyss                             |Adventure|Friday Night                  |Friday Night  

In [73]:
predictions.filter(col("appid") == 10).show(truncate=False)

[Stage 3357:>                                                       (0 + 1) / 1]

+-----+--------------+------------------+----------+
|appid|author_steamid|sentiment_score   |prediction|
+-----+--------------+------------------+----------+
|10   |140541        |0.8802974486831863|0.38343713|
|10   |239548        |0.8731478197915625|0.856684  |
|10   |285169        |0.8869327207951649|0.75947124|
|10   |388847        |0.8745790167584548|0.38438275|
|10   |418167        |0.8775252911171331|0.8435322 |
|10   |828691        |0.8301549389535384|0.91713804|
|10   |882285        |0.8738930130709596|0.7926611 |
|10   |913290        |0.8807447585488885|0.76276624|
|10   |928290        |0.8696314853142184|0.820919  |
|10   |1783852       |0.8765157041511878|0.6533298 |
|10   |2088738       |0.8902188776002593|0.40487254|
|10   |2533840       |0.8825222927104017|0.5615467 |
|10   |3201861       |0.8451422727573333|0.811101  |
|10   |3505217       |0.87367558412711  |0.8496253 |
|10   |4075465       |0.87367558412711  |0.3818016 |
|10   |4088452       |0.8679660120935957|0.734

                                                                                

In [74]:
from pyspark.sql.functions import col, explode

# Specify the game for which you want to recommend users
specific_game = 10  # Replace with the actual game appid

# Create a DataFrame for the specific game
specific_game_df = recommendation.filter(recommendation.appid == specific_game).select("appid").distinct()

# Generate recommendations for the specific game
gameRecs = model.recommendForItemSubset(specific_game_df, 5)

# Show the recommendations
gameRecs.show(truncate=False)



+-----+----------------------------------------------------------------------------------------------------------------------+
|appid|recommendations                                                                                                       |
+-----+----------------------------------------------------------------------------------------------------------------------+
|10   |[{114728254, 1.2838367}, {67385174, 1.2818373}, {184253085, 1.2620038}, {202668955, 1.2540469}, {90491259, 1.2472687}]|
+-----+----------------------------------------------------------------------------------------------------------------------+



                                                                                

In [77]:
from pyspark.sql.functions import col, explode

# Explode the recommendations into separate rows
exploded_game_recs = gameRecs.withColumn("recommendation", explode(col("recommendations")))

# Inspect schema to confirm the correct fields
exploded_game_recs.printSchema()

# Extract author_steamid and rating from recommendations (update field names based on schema)
exploded_game_recs = exploded_game_recs.select(
    col("appid"),
    col("recommendation.author_steamid").alias("author_steamid"),  # Use correct field
    col("recommendation.rating").alias("rating")
)

# Join with the games DataFrame to enrich the recommendations (optional)
enriched_game_recs = exploded_game_recs.join(games_info_unique, "appid", "left")

# Filter for the specific game
game_10_recs = enriched_game_recs.filter(col("appid") == specific_game)

# Show the top 5 users for the specific game
game_10_recs.show(truncate=False)

root
 |-- appid: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_steamid: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)
 |-- recommendation: struct (nullable = true)
 |    |-- author_steamid: integer (nullable = true)
 |    |-- rating: float (nullable = true)



                                                                                

+-----+--------------+---------+--------------+------+---------+---------+-----+-----------------------+
|appid|author_steamid|rating   |name          |genre |developer|publisher|price|label                  |
+-----+--------------+---------+--------------+------+---------+---------+-----+-----------------------+
|10   |114728254     |1.2838367|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |67385174      |1.2818373|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |184253085     |1.2620038|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |202668955     |1.2540469|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
|10   |90491259      |1.2472687|Counter-Strike|Action|Valve    |Valve    |9.99 |Overwhelmingly Positive|
+-----+--------------+---------+--------------+------+---------+---------+-----+-----------------------+



In [83]:
# Select a subset of users (limit to 3 for simplicity)
users = recommendation.select("author_steamid").distinct().limit(3)

# Generate recommendations for the selected users
userSubsetRecs = model.recommendForUserSubset(users, 5)

# Display recommendations
userSubsetRecs.show(5, truncate=False)



+--------------+------------------------------------------------------------------------------------------------------------+
|author_steamid|recommendations                                                                                             |
+--------------+------------------------------------------------------------------------------------------------------------+
|51362428      |[{2251240, 1.7431304}, {2129020, 1.670718}, {2079950, 1.6682618}, {836640, 1.5459373}, {1945230, 1.4979694}]|
|10620100      |[{1945230, 1.8276551}, {2251240, 1.6629995}, {836640, 1.6355375}, {2129020, 1.6316975}, {709060, 1.5780795}]|
|9867023       |[{2129020, 1.7035441}, {2251240, 1.6600416}, {2079950, 1.6236326}, {1945230, 1.56118}, {836640, 1.5605195}] |
+--------------+------------------------------------------------------------------------------------------------------------+



                                                                                

In [84]:
from pyspark.sql.functions import col, explode

# Explode the recommendations into separate rows
exploded_user_recs = userSubsetRecs.withColumn("recommendation", explode(col("recommendations")))

# Extract appid and rating from recommendations
exploded_user_recs = exploded_user_recs.select(
    col("author_steamid"),
    col("recommendation.appid").alias("appid"),
    col("recommendation.rating").alias("rating")
)

# Join with games_info_unique to include detailed information
detailed_user_recs = exploded_user_recs.join(games_info_unique, "appid", "left")

# Order by author_steamid and rating (optional for better organization)
ordered_user_recs = detailed_user_recs.orderBy("author_steamid", col("rating").desc())

# Show the ordered recommendations
ordered_user_recs.show(truncate=False)



+-------+--------------+---------+---------------------------------------------+---------+------------------------------+------------------------------+-----+-------------+
|appid  |author_steamid|rating   |name                                         |genre    |developer                     |publisher                     |price|label        |
+-------+--------------+---------+---------------------------------------------+---------+------------------------------+------------------------------+-----+-------------+
|2251240|28083380      |1.7208892|Police Car Simulator                         |Action   |Yusuf Islam Seyhan, Samet Acar|Inspector Studios             |0.99 |Negative     |
|2129020|28083380      |1.6844431|Last Christmas                               |Adventure|polytely games                |polytely games                |4.99 |Positive     |
|2079950|28083380      |1.6583772|Filthy us 2                                  |Adventure|Game Logic Studio             |Game Logic Stu

                                                                                

In [86]:
# Select a subset of games (limit to 3 for simplicity)
games = recommendation.select("appid").distinct().limit(3)

# Generate recommendations for the selected games
gameSubsetRecs = model.recommendForItemSubset(games, 5)

# Display recommendations
gameSubsetRecs.show(5, truncate=False)



+-----+---------------------------------------------------------------------------------------------------------------------------+
|appid|recommendations                                                                                                            |
+-----+---------------------------------------------------------------------------------------------------------------------------+
|1500 |[{111016570, 0.9828935}, {136152822, 0.955087}, {206996522, 0.9525924}, {253890477, 0.9513529}, {1540483019, 0.94899786}]  |
|300  |[{250167120, 0.9747308}, {1281745788, 0.97441274}, {136152822, 0.97297263}, {253890477, 0.96980774}, {53734561, 0.9695108}]|
|1280 |[{111016570, 0.9891624}, {1103890628, 0.98634315}, {253890477, 0.9619974}, {136152822, 0.9608458}, {1054905884, 0.9596046}]|
+-----+---------------------------------------------------------------------------------------------------------------------------+



                                                                                

In [88]:
from pyspark.sql.functions import col, explode

# Explode the recommendations into separate rows
exploded_game_recs = gameSubsetRecs.withColumn("recommendation", explode(col("recommendations")))

# Extract appid and rating from recommendations
exploded_game_recs = exploded_game_recs.select(
    col("appid"),
    col("recommendation.author_steamid").alias("author_steamid"),
    col("recommendation.rating").alias("rating")
)

# Join with games_info_unique to include detailed information
detailed_game_recs = exploded_game_recs.join(games_info_unique, "appid", "left")

# Order by appid and rating for better organization
ordered_game_recs = detailed_game_recs.orderBy("appid", col("rating").desc())

# Show the detailed recommendations table
ordered_game_recs.show(truncate=False)



+-----+--------------+----------+-----------------------+----------+--------------------------------------------+-------------------+-----+-----------------------+
|appid|author_steamid|rating    |name                   |genre     |developer                                   |publisher          |price|label                  |
+-----+--------------+----------+-----------------------+----------+--------------------------------------------+-------------------+-----+-----------------------+
|2320 |112710692     |1.0559646 |Quake II               |Action    |id Software, Nightdive Studios, MachineGames|Bethesda Softworks |3.99 |Overwhelmingly Positive|
|2320 |1054905884    |1.0538441 |Quake II               |Action    |id Software, Nightdive Studios, MachineGames|Bethesda Softworks |3.99 |Overwhelmingly Positive|
|2320 |155438115     |1.0142664 |Quake II               |Action    |id Software, Nightdive Studios, MachineGames|Bethesda Softworks |3.99 |Overwhelmingly Positive|
|2320 |920354932

                                                                                