In [1]:
# !pip install pyspark

In [2]:
from pyspark.sql import SparkSession
import numpy as np
import pyspark.sql.functions as F
import pandas as pd

In [3]:
# Run this cell if running locally

# spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
# Run this cell for SDSC at 60 gb per node, 30 cores. 

# spark = SparkSession.builder\
#     .config("spark.driver.memory", "4g")\
#     .config("spark.executor.memory", "4g")\
#     .config("spark.executor.instances", 14)\
#     .getOrCreate()

In [5]:
# Run this cell for SDSC at 80 gb per node, 30 cores. 

spark = SparkSession.builder\
    .config("spark.driver.memory", "8g")\
    .config("spark.executor.memory", "4g")\
    .config("spark.executor.instances", 18)\
    .getOrCreate()

In [6]:
spark

In [7]:
# Load CSV into Spark DataFrame

# Cluster Directory: "/home/kphan/kphan/data/all_reviews/all_reviews.csv"
dir = "/home/kphan/kphan/data/all_reviews/all_reviews.csv"

#dir = "resources/data/all_reviews_100000_sample.csv"
df = spark.read.csv(dir, header=True, inferSchema=True)

# Show df
df.show(10)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+----------------------------------+-----------------+-----------------+--------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|                            review|timestamp_created|timestamp_updated|voted_up|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|
+----------------+-----+--------------+-----------------+----------------------+------------------+-----

In [8]:
# List out the columns for ease of view
print(f"Columns in Original Dataframe:\n\n{df.columns}\n")

df.describe().show()


Columns in Original Dataframe:

['recommendationid', 'appid', 'game', 'author_steamid', 'author_num_games_owned', 'author_num_reviews', 'author_playtime_forever', 'author_playtime_last_two_weeks', 'author_playtime_at_review', 'author_last_played', 'language', 'review', 'timestamp_created', 'timestamp_updated', 'voted_up', 'votes_up', 'votes_funny', 'weighted_vote_score', 'comment_count', 'steam_purchase', 'received_for_free', 'written_during_early_access', 'hidden_in_steam_china', 'steam_china_location']

+-------+-------------------+------------------+-----------------+--------------------+----------------------+--------------------+-----------------------+------------------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------------+--------------------+--------------------+--------------------+------------------

In [9]:
df.count()

113885601

In [10]:
# Example of problematic row
df.filter(df.recommendationid == 144385593).show()

+----------------+-------+--------------------+--------------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+----------+-----------------+--------------------+----------+----------+-----------+-------------------+-------------+-----------------+-----------------+---------------------------+---------------------+--------------------+
|recommendationid|  appid|                game|      author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|    review|timestamp_created|   timestamp_updated|  voted_up|  votes_up|votes_funny|weighted_vote_score|comment_count|   steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|
+----------------+-------+--------------------+--------------------+----------------------+------------------+

## Initial Data Cleanup / Feature Engineering

In [11]:
# Show data types

df.printSchema()

root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: string (nullable = true)
 |-- author_num_reviews: string (nullable = true)
 |-- author_playtime_forever: string (nullable = true)
 |-- author_playtime_last_two_weeks: string (nullable = true)
 |-- author_playtime_at_review: string (nullable = true)
 |-- author_last_played: string (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- voted_up: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable = true)
 |-- writt

In [12]:
# Drop multiple columns not being used for study

# Dropping review column as content will not be evaluated (not exploring NLP in this case)
df_cleaned = df.drop("steam_china_location", "hidden_in_steam_china", "review")

# Show the updated DataFrame
df_cleaned.show(10)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+-----------------+-----------------+--------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|language|timestamp_created|timestamp_updated|voted_up|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|
+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+-----------------+-----------------+--------+-------

In [13]:
# Only use reviews in which the weighted vote score is greater than zero

# Convert column to float
df_cleaned = df_cleaned.withColumn("weighted_vote_score", F.col("weighted_vote_score").cast("float"))

# Then filter
df_cleaned = df_cleaned.filter(F.col("weighted_vote_score") > 0)
df_cleaned.select("weighted_vote_score").summary().show()
df_cleaned.show(10)

+-------+-------------------+
|summary|weighted_vote_score|
+-------+-------------------+
|  count|           38597045|
|   mean|                NaN|
| stddev|                NaN|
|    min|       0.0013244302|
|    25%|         0.48704663|
|    50%|          0.5217391|
|    75%|         0.53051645|
|    max|                NaN|
+-------+-------------------+

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+---------+-----------------+-----------------+--------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played| language|timestamp_created|timestamp_updated|voted_up|votes_up|votes

In [14]:
# Convert necessary columns to correct type

columns_to_int = ["recommendationid", "appid", "author_num_games_owned", "author_num_reviews", "author_playtime_forever", "author_playtime_last_two_weeks", "voted_up", "votes_up", "votes_funny", "comment_count", "steam_purchase", "received_for_free", "written_during_early_access"]
for column in columns_to_int: 
    df_cleaned = df_cleaned.withColumn(column, F.col(column).cast("int"))

columns_to_long = ["author_steamid", "timestamp_created", "timestamp_updated"]
for column in columns_to_long: 
    df_cleaned = df_cleaned.withColumn(column, F.col(column).cast("long"))


In [15]:
# Confirm changed schema
df_cleaned.printSchema()
df_cleaned.count()

root
 |-- recommendationid: integer (nullable = true)
 |-- appid: integer (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: long (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: string (nullable = true)
 |-- author_last_played: string (nullable = true)
 |-- language: string (nullable = true)
 |-- timestamp_created: long (nullable = true)
 |-- timestamp_updated: long (nullable = true)
 |-- voted_up: integer (nullable = true)
 |-- votes_up: integer (nullable = true)
 |-- votes_funny: integer (nullable = true)
 |-- weighted_vote_score: float (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- steam_purchase: integer (nullable = true)
 |-- received_for_free: integer (nullable = true)
 |-- written_during_early_access: integer (

38597045

In [16]:
# Remove rows caused by incorrect data type
df_cleaned = df_cleaned.dropna(subset = ["recommendationid", "appid", "author_steamid", "timestamp_created", "timestamp_updated", "voted_up", "votes_up", "votes_funny", "comment_count", "steam_purchase", "received_for_free", "written_during_early_access"])
df_cleaned.count()

37708777

In [17]:
# df_cleaned.summary().show()

In [18]:
# Converting author playtime to hours
df_cleaned = (
    df_cleaned
    .withColumn("author_playtime_forever", F.col("author_playtime_forever").cast("float") / 60)
    .withColumn("author_playtime_at_review", F.col("author_playtime_at_review").cast("float") / 60)
)

df_cleaned.show(10)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+---------+-----------------+-----------------+--------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played| language|timestamp_created|timestamp_updated|voted_up|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|
+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+---------+-----------------+-----------------+--------+----

In [19]:
### Not necessary just thought it would make playtime cleaner ###

# Round playtime to whole number

df_cleaned = (
    df_cleaned
    .withColumn("author_playtime_forever", F.round(F.col("author_playtime_forever"), 0))
    .withColumn("author_playtime_at_review", F.round(F.col("author_playtime_at_review"), 0))
)

df_cleaned.show(10)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+---------+-----------------+-----------------+--------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played| language|timestamp_created|timestamp_updated|voted_up|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|
+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+---------+-----------------+-----------------+--------+----

In [20]:
# Removing any duplicates (and counting) - long/expensive operation

before = df_cleaned.count()
df_cleaned = df_cleaned.dropDuplicates()
after =df_cleaned.count()

print(f"Count of rows before: {before}\nCount of rows after: {after}")

Count of rows before: 37708777
Count of rows after: 37708777


In [21]:
# Save cleaned dataframe
df_cleaned.write.csv("/home/kphan/kphan/data/cleaned_data", header=True)

In [22]:
spark.stop()

## Visualizations for Playtime

Moved to separate notebook

In [23]:
# # Bar graph which games have the most play time (of reviews)

# # Looking to represent playtime statistics for each game

# total_playtimes = (
#     df_cleaned.groupBy("game")
#     .agg(F.sum("author_playtime_at_review").alias("Total Reviewer Playtime (At time of review)"), F.count("*").alias("Review Count"))
#     .withColumn("Average Playtime Per Review", F.round(F.col("Total Reviewer Playtime (At time of review)") / F.col("Review Count")))
# )

# total_playtimes.show()

In [24]:
# # Conversion of aggregated table to Pandas for visualization
# top_10_reviewer_playtimes_df = total_playtimes.orderBy(F.col("Total Reviewer Playtime (At time of review)").desc()).limit(10).toPandas()
# top_10_review_count_df = total_playtimes.orderBy(F.col("Review Count").desc()).limit(10).toPandas()

In [25]:
# top_10_reviewer_playtimes_df

In [26]:
# top_10_review_count_df

In [27]:
# # Bargraph of top 10 games with the most total playtime per reviewer, top 10 games with most reviews
# # Omitting stats on average playtime per review as would be skewed towards games with fewer reviews

# fig, axes = plt.subplots(2,1,figsize = (6,12))

# # Plot 1: Top 10 games with the most total playtime per reviewer
# sns.barplot(
#     y = top_10_reviewer_playtimes_df["game"],
#     x = top_10_reviewer_playtimes_df["Total Reviewer Playtime (At time of review)"],
#     ax = axes[0],
#     palette = "rocket",
#     orient = "h"
# )
# axes[0].set_title("Top 10 Games with Highest Playtime of Reviewers")
# axes[0].set_ylabel("Game Title")
# axes[0].set_xlabel("Playtime (Hours)")
# for container in axes[0].containers:
#   axes[0].bar_label(container, fmt='%.1f')

# # Plot 2: Top 10 games with most reviews
# sns.barplot(
#     y = top_10_review_count_df["game"],
#     x = top_10_review_count_df["Review Count"],
#     ax = axes[1],
#     palette = "rocket", 
#     orient = "h"
# )
# axes[1].set_title("Top 10 Games with the Most Reviews")
# axes[1].set_ylabel("Game Title")
# axes[1].set_xlabel("Review Count")
# for container in axes[1].containers:
#   axes[1].bar_label(container, fmt='%.1f')

# plt.show()

## Visualizations - Scatter Plots

In [28]:
# # Curious to see if there is a pattern regarding playtime and weighted vote score

# playtime_votescore_df = df_cleaned.select("weighted_vote_score", "author_playtime_at_review").toPandas()

In [29]:
# sns.scatterplot(
#     data = playtime_votescore_df,
#     y = "weighted_vote_score",
#     x = "author_playtime_at_review"
# )

# plt.title("Scatterplot of Author Playtime at Review vs Weighted Vote Score")
# plt.ylabel("Weighted Vote Score (Steam metric)")
# plt.xlabel("Author Playtime At Review (hours)")

# plt.show()