In [None]:
#Appendix A


kaggle datasets download -d kieranpoc/steam-reviews
unzip steam-reviews.zip
gcloud storage buckets create gs://gamasteamreviews --project=gamasteam \ --default-storage-class=STANDARD --location=us-central1 --uniform-bucket-level-access
gcloud storage cp all_reviews/all_reviews.csv gs://gamasteamreviews/landing/



In [None]:
#Appendix B

import matplotlib as plt
import seaborn as sns
from pyspark.sql import functions as F
csv = "gs://gamasteamreviews/landing/all_reviews.csv"
df = spark.read.csv(csv, header=True, inferSchema=True, multiLine=True, escape='"')

df.write.mode("overwrite").parquet("gs://gamasteamreviews/landing/all_reviews.parquet")
df = spark.read.parquet("gs://gamasteamreviews/landing/all_reviews.parquet")
#I converted it to parquet to try and alleviate some performance issues
#the record counts
df.cache
df.count()

#the columns and data types

df.printSchema()
#handling the null values in the data

null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts_pandas = null_counts.toPandas().transpose()
null_counts_pandas.columns = ["Null Count"]
null_counts_pandas.style
stats = df.select(
    "author_num_games_owned",
    "author_num_reviews",
    "author_playtime_forever",
    "author_playtime_last_two_weeks",
    "author_playtime_at_review",
    "author_last_played",
    "voted_up",
    "votes_up",
    "votes_funny",
    "weighted_vote_score"
).summary("count", "min", "max", "mean","stddev")

stats_pandas = stats.toPandas()

stats_pandas.style

#the stats for the dates

date_stats = df.select(
    F.from_unixtime("timestamp_created").alias("created_date"),
    F.from_unixtime("timestamp_updated").alias("updated_date")
    ).select(
    F.min("created_date").alias("min_created_date"),
    F.max("created_date").alias("max_created_date"),
    F.min("updated_date").alias("min_updated_date"),
    F.max("updated_date").alias("max_updated_date")
)

date_summary_pandas = date_stats.toPandas()
date_summary_pandas.style
#review statistics

df = df.withColumn("review_word_count", F.size(F.split(F.col("review"), " ")))
review_stats = df.agg(
    F.min("review_word_count").alias("min_word_count"),
    F.max("review_word_count").alias("max_word_count"),
    F.avg("review_word_count").alias("avg_word_count")
)
review_stats_pandas = review_stats.toPandas()
review_stats_pandas.style
#review count by language
review_count_by_language = df.groupBy("language").count().orderBy("count", ascending=False).limit(5).toPandas()

plt.figure(figsize=(10, 6))
sns.barplot(data=review_count_by_language, x="language", y="count", palette="viridis")
plt.title("Review Count by Language (Top 5 Languages)")
plt.xlabel("Language")
plt.ylabel("Review Count")
plt.show()

#number of reviews for playtimes
df_sample = df.sample(0.1).select(
    (F.col("author_playtime_forever") / 60).alias("author_playtime_hours"), 
    "author_num_reviews"
).toPandas()

plt.figure(figsize=(10, 6))
sns.scatterplot(data=df_sample, x="author_playtime_hours", y="author_num_reviews", alpha=0.6)
plt.title("Playtime (Hours) vs. Number of Reviews")
plt.xlabel("Playtime (Hours)")
plt.ylabel("Number of Reviews")
plt.show()

 


In [None]:
#Appendix C

from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, BooleanType, LongType
from pyspark.sql import functions as F

csv = "gs://gamasteamreviews/landing/all_reviews.csv"

df = spark.read.csv(
    csv,
    header=True,        
    inferSchema=True,   
    multiLine=True,     
    escape='"'          
)
df.write.mode("overwrite").parquet("gs://gamasteamreviews/landing/all_reviews_parquet")
parquet_path = "gs://gamasteamreviews/landing/all_reviews_parquet"
df = spark.read.parquet(parquet_path)

columns_to_keep = [
    "author_num_games_owned",
    "author_num_reviews",
    "author_playtime_forever",
    "author_playtime_last_two_weeks",
    "author_playtime_at_review",
    "author_last_played",
    "language",
    "voted_up",
    "steam_purchase",
    "received_for_free",
    "timestamp_created",
    "timestamp_updated"
]

df = df.select(*columns_to_keep)

for col in ["author_num_games_owned", "author_num_reviews", "author_playtime_forever",
            "author_playtime_last_two_weeks", "author_playtime_at_review", "author_last_played"]:
    median_value = df.approxQuantile(col, [0.5], 0.05)[0]
    df = df.fillna({col: median_value})

df = df.fillna({
    "voted_up": False,
    "steam_purchase": False,
    "received_for_free": False,
})

df = df.fillna({"language": "unknown"})
cleaned_parquet_path = "gs://gamasteamreviews/cleaned/all_reviews_cleaned.parquet"
df.write.mode("overwrite").parquet(cleaned_parquet_path)

 


In [None]:
#Appendix D

from pyspark.sql import functions as F
from pyspark.sql.functions import when, col, from_unixtime, hour
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

df = spark.read.parquet("gs://gamasteamreviews/cleaned/all_reviews_cleaned.parquet")
#with this dataset containing unix data for when the reviews are written, it's possible that the time of day affects review sentiment.
#perhaps gamers are more likely to leave positive reviews in the afternoon, or night. 
df = df.withColumn(
    "time_of_day",
    when((hour(from_unixtime(col("timestamp_created"))) >= 6) & (hour(from_unixtime(col("timestamp_created"))) < 12), "morning")
    .when((hour(from_unixtime(col("timestamp_created"))) >= 12) & (hour(from_unixtime(col("timestamp_created"))) < 18), "afternoon")
    .when((hour(from_unixtime(col("timestamp_created"))) >= 18) & (hour(from_unixtime(col("timestamp_created"))) < 24), "evening")
    .otherwise("night"))

    #this feature shows us the amount of time that has passed between when the reviewer had played the game, and when they reviewed it
#the idea is the shorter the gap between the time last played and time of review, the stronger the reviewer will feel about the game due to recency bias

df = df.withColumn(
    "recency_bias",
    (col("timestamp_created") - col("author_last_played")) / (24 * 60 * 60)
)

#this is a feature piggybacking off the previous one, it checks to see if the author has kept playing the game after their review.

df = df.withColumn(
    "played_after_review",
    F.when(col("author_last_played") > col("timestamp_created"), 1).otherwise(0)
)
#filling any missing data in our newly created numeric columns

for col_name in [
    "recency_bias",
    "played_after_review"
]:
    median_value = df.approxQuantile(col_name, [0.5], 0.05)[0]
    df = df.fillna({col_name: median_value})
#using the string indexer on our categorical values. using handleinvalid keep to deal with missing values

language_indexer = StringIndexer(inputCol="language", outputCol="language_indexed", handleInvalid="keep")
time_of_day_indexer = StringIndexer(inputCol="time_of_day", outputCol="time_of_day_indexed", handleInvalid="keep")
#after using the string indexer and dealing with any missing values, we assemble our features into a vector

feature_columns = [
    "author_num_games_owned",
    "author_num_reviews",
    "author_playtime_forever",
    "author_playtime_last_two_weeks",
    "author_playtime_at_review",
    "author_last_played",
    "recency_bias",
    "played_after_review",
    "language_indexed",
    "time_of_day_indexed"
]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
#creating a pipeline

pipeline = Pipeline(stages=[language_indexer, time_of_day_indexer, assembler])
df_transformed = pipeline.fit(df).transform(df)
#creating our data isnto training and test sets

train_data, test_data = df_transformed.randomSplit([0.7, 0.3], seed=42)
#setting up for and executing k fold cross evaluation to find the parameters that will give us the best performance

dt_classifier = DecisionTreeClassifier(labelCol="voted_up", featuresCol="features", maxDepth=15)

params = (
    ParamGridBuilder()
    .addGrid(dt_classifier.maxDepth, [5, 10, 15])
    .addGrid(dt_classifier.maxBins, [32, 64])
    .build()
)
evaluator = MulticlassClassificationEvaluator(labelCol="voted_up", predictionCol="prediction", metricName="accuracy")

cross_val = CrossValidator(
    estimator=dt_classifier,
    estimatorParamMaps=params,
    evaluator=evaluator,
    numFolds=5 
)

cv_model = cross_val.fit(train_data)
predictions = cv_model.bestModel.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Cross-validated Accuracy: {accuracy}")
precision = MulticlassClassificationEvaluator(
    labelCol="voted_up", 
    predictionCol="prediction", 
    metricName="weightedPrecision"
).evaluate(predictions)
print(f"Precision: {precision}")
recall = MulticlassClassificationEvaluator(
    labelCol="voted_up", 
    predictionCol="prediction", 
    metricName="weightedRecall"
).evaluate(predictions)
print(f"Recall: {recall}")

f1 = MulticlassClassificationEvaluator(
    labelCol="voted_up", 
    predictionCol="prediction", 
    metricName="f1"
).evaluate(predictions)
print(f"F1-Score: {f1}")
trusted_path = "gs://gamasteamreviews/Trusted/all_reviews_with_features.parquet"
df_transformed.write.mode("overwrite").parquet(trusted_path)
models_path = "gs://gamasteamreviews/Models/decision_tree_model"
cv_model.bestModel.write().overwrite().save(models_path)
 
