# Part 0: Raw Data & Environment Set-up

## Raw Data

The following code loads in and unzips the raw data. Raw data has been downloaded from Kaggle placed into public Google Cloud Storage for ease of access.

In [None]:
# Loads in steam-reviews.zip (game reviews)
url1 = ("https://storage.googleapis.com/dsc232r-group-project-data/steam-reviews.zip")
!wget "{url1}"

In [None]:
# Extracts steam-reviews.zip into specified directory and deletes .zip file
!unzip steam-reviews.zip -d /home/joneel/joneel/Group_Project/raw_data/steam-reviews && rm steam-reviews.zip

In [None]:
# Loads in games.csv (games metadata)
url2 = ("https://storage.googleapis.com/dsc232r-group-project-data/games.csv")
!wget "{url2}"

In [None]:
# Moves games.csv into specified directory
!mv games.csv /home/joneel/joneel/Group_Project/raw_data

## Environment Set-up

Set-up on the cluster included 30 cores with 60GB memory in order to load and process this dataset (total ~45GB).

In [1]:
# Import required libraries
import os, pickle, glob
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark import StorageLevel

In [3]:
# sc.stop() # To stop a currently running SparkSession, if needed for troubleshooting/development

In [4]:
# Establishes Spark Session
sc = SparkSession.builder \
    .config("spark.driver.memory", "8g") \
	.config("spark.executor.memory", "8g") \
    .config("spark.executor.instances", "10") \
    .config("spark.executor.cores", "4") \
	.appName("MySparkJob") \
	.getOrCreate()

In [5]:
# Loads all_reviews.csv file into a spark dataframe
reviews_df = sc.read.csv("/home/joneel/joneel/Group_Project/raw_data/steam-reviews/all_reviews/all_reviews.csv", header=True, inferSchema=True)

In [6]:
# Displays reviews_df schema and counts total # of reviews
reviews_df.printSchema()
print(f"Number of reviews: {reviews_df.count()}")

root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: string (nullable = true)
 |-- author_num_reviews: string (nullable = true)
 |-- author_playtime_forever: string (nullable = true)
 |-- author_playtime_last_two_weeks: string (nullable = true)
 |-- author_playtime_at_review: string (nullable = true)
 |-- author_last_played: string (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- voted_up: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable = true)
 |-- writt

In [7]:
# Removes two columns related to Chinese gaming market & one column that was not well defined (not relevant for this analysis)
reviews_df = reviews_df.drop("hidden_in_steam_china", "steam_china_location", "voted_up")
reviews_df.printSchema()

root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: string (nullable = true)
 |-- author_num_reviews: string (nullable = true)
 |-- author_playtime_forever: string (nullable = true)
 |-- author_playtime_last_two_weeks: string (nullable = true)
 |-- author_playtime_at_review: string (nullable = true)
 |-- author_last_played: string (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable = true)
 |-- written_during_early_access: string (nullable

In [8]:
# Filters dataframe to include only reviews in English & counts new number of reviews
reviews_df_processed = reviews_df.filter(reviews_df.language == 'english')
reviews_df_processed.select("language").distinct().show()
print(f"Number of reviews: {reviews_df_processed.count()}")

+--------+
|language|
+--------+
| english|
+--------+

Number of reviews: 51544179


In [9]:
# Drops rows that contain null values in the id or reviews column & drop duplicate recommendation ids
reviews_df_processed = reviews_df_processed.na.drop(subset=["recommendationid", "appid", "author_steamid", "review"])
print(f"Number of reviews: {reviews_df_processed.count()}")
reviews_df_processed = reviews_df_processed.dropDuplicates(subset=["recommendationid"])
print(f"Number of reviews: {reviews_df_processed.count()}")

Number of reviews: 51544179
Number of reviews: 51351970


In [10]:
# Cast columns to appropriate datatypes based on definitions on Kaggle
reviews_df_processed = reviews_df_processed.withColumn("author_num_games_owned", f.col("author_num_games_owned").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("author_num_reviews", f.col("author_num_reviews").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("author_playtime_forever", f.col("author_playtime_forever").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("author_playtime_last_two_weeks", f.col("author_playtime_last_two_weeks").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("author_playtime_at_review", f.col("author_playtime_at_review").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("author_last_played", f.from_unixtime(f.col("author_last_played")).cast("timestamp"))
reviews_df_processed = reviews_df_processed.withColumn("timestamp_created", f.from_unixtime(f.col("timestamp_created")).cast("timestamp"))
reviews_df_processed = reviews_df_processed.withColumn("timestamp_updated", f.from_unixtime(f.col("timestamp_updated")).cast("timestamp"))
reviews_df_processed = reviews_df_processed.withColumn("votes_up", f.col("votes_up").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("votes_funny", f.col("votes_funny").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("weighted_vote_score", f.col("weighted_vote_score").cast("double"))
reviews_df_processed = reviews_df_processed.withColumn("comment_count", f.col("comment_count").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("steam_purchase", f.col("steam_purchase").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("received_for_free", f.col("received_for_free").cast("integer"))
reviews_df_processed = reviews_df_processed.withColumn("written_during_early_access", f.col("written_during_early_access").cast("integer"))
reviews_df_processed.printSchema()

root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: timestamp (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: timestamp (nullable = true)
 |-- timestamp_updated: timestamp (nullable = true)
 |-- votes_up: integer (nullable = true)
 |-- votes_funny: integer (nullable = true)
 |-- weighted_vote_score: double (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- steam_purchase: integer (nullable = true)
 |-- received_for_free: integer (nullable = true)
 |-- written_during_early_acces

In [11]:
# Checking columns that should be "0" or "1"
reviews_df_processed = reviews_df_processed.filter((reviews_df_processed["steam_purchase"] == 0) | (reviews_df_processed["steam_purchase"] == 1))
reviews_df_processed.select("steam_purchase").show(5) # Should be only 0's or 1's
print(reviews_df_processed.select("steam_purchase").distinct().count()) # Should be "2"

reviews_df_processed = reviews_df_processed.filter((reviews_df_processed["received_for_free"] == 0) | (reviews_df_processed["received_for_free"] == 1))
reviews_df_processed.select("received_for_free").show(5) # Should be only 0's or 1's
print(reviews_df_processed.select("received_for_free").distinct().count()) # Should be "2"

reviews_df_processed = reviews_df_processed.filter((reviews_df_processed["written_during_early_access"] == 0) | 
                                                   (reviews_df_processed["written_during_early_access"] == 1))
reviews_df_processed.select("written_during_early_access").show(5) # Should be only 0's or 1's
print(reviews_df_processed.select("written_during_early_access").distinct().count()) # Should be "2"

# Cast to boolean
reviews_df_processed = reviews_df_processed.withColumn("steam_purchase", f.col("steam_purchase").cast("boolean"))
reviews_df_processed = reviews_df_processed.withColumn("received_for_free", f.col("received_for_free").cast("boolean"))
reviews_df_processed = reviews_df_processed.withColumn("written_during_early_access", f.col("written_during_early_access").cast("boolean"))

# Double check schema and print new review count
reviews_df_processed.printSchema()
print(f"Number of reviews: {reviews_df_processed.count()}")

+--------------+
|steam_purchase|
+--------------+
|             0|
|             1|
|             1|
|             0|
|             1|
+--------------+
only showing top 5 rows

2
+-----------------+
|received_for_free|
+-----------------+
|                0|
|                0|
|                0|
|                0|
|                0|
+-----------------+
only showing top 5 rows

2
+---------------------------+
|written_during_early_access|
+---------------------------+
|                          0|
|                          0|
|                          0|
|                          0|
|                          0|
+---------------------------+
only showing top 5 rows

2
root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer 

In [23]:
# Splits data into two dataframes, 1 with all the review metadata and 1 with only the recommendationid + appid + review
reviews_df_processed_metadata = reviews_df_processed.drop("review")
reviews_df_processed_reviews = reviews_df_processed.select("recommendationid", "appid", "review")
reviews_df_processed_metadata.printSchema()
reviews_df_processed_reviews.printSchema()

root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: timestamp (nullable = true)
 |-- language: string (nullable = true)
 |-- timestamp_created: timestamp (nullable = true)
 |-- timestamp_updated: timestamp (nullable = true)
 |-- votes_up: integer (nullable = true)
 |-- votes_funny: integer (nullable = true)
 |-- weighted_vote_score: double (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- received_for_free: boolean (nullable = true)
 |-- written_during_early_access: boolean (nullable = true)

root
 |-

In [32]:
# Loads games.csv file into a spark dataframe
games_df = sc.read.csv("/home/joneel/joneel/Group_Project/raw_data/games.csv", header=True, inferSchema=True)

In [33]:
# Displays games_df schema and counts total # of games
games_df.printSchema()
print(f"Number of games: {games_df.count()}")

root
 |-- appid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- required_age: string (nullable = true)
 |-- price: string (nullable = true)
 |-- dlc_count: double (nullable = true)
 |-- detailed_description: string (nullable = true)
 |-- about_the_game: string (nullable = true)
 |-- short_description: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- header_image: string (nullable = true)
 |-- website: string (nullable = true)
 |-- support_url: string (nullable = true)
 |-- support_email: string (nullable = true)
 |-- windows: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- linux: string (nullable = true)
 |-- metacritic_score: string (nullable = true)
 |-- metacritic_url: string (nullable = true)
 |-- achievements: string (nullable = true)
 |-- recommendations: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- supported_languages: string (nullable = true)
 |-- full_audi

In [34]:
# Removes columns not relevant for this analysis
games_df_processed = games_df.drop("reviews", "header_image", "website", "support_url", "support_email", "full_audio_languages", "screenshots", "movies")
games_df_processed.printSchema()

root
 |-- appid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- required_age: string (nullable = true)
 |-- price: string (nullable = true)
 |-- dlc_count: double (nullable = true)
 |-- detailed_description: string (nullable = true)
 |-- about_the_game: string (nullable = true)
 |-- short_description: string (nullable = true)
 |-- windows: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- linux: string (nullable = true)
 |-- metacritic_score: string (nullable = true)
 |-- metacritic_url: string (nullable = true)
 |-- achievements: string (nullable = true)
 |-- recommendations: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- supported_languages: string (nullable = true)
 |-- packages: string (nullable = true)
 |-- developers: string (nullable = true)
 |-- publishers: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- user_score: str

In [35]:
# Drops rows that contain null values or duplicates in the appid column
print(f"Number of games: {games_df_processed.count()}")
games_df_processed = games_df_processed.na.drop(subset=["appid"])
print(f"Number of games: {games_df_processed.count()}")
games_df_processed = games_df_processed.dropDuplicates(subset=["appid"])
print(f"Number of games: {games_df_processed.count()}")

Number of games: 89618
Number of games: 89618
Number of games: 89618


In [None]:
# Cast columns to appropriate data types based ond descriptions on Kaggle
games_df_processed = games_df_processed.withColumn("release_date", f.to_timestamp("release_date", "yyyy-MM-dd"))
games_df_processed.printSchema()

In [37]:
# Cast columns to appropriate data types based ond descriptions on Kaggle
games_df_processed.select("release_date").show(5)
games_df_processed = games_df_processed.withColumn("release_date", f.to_timestamp("release_date", "yyyy-MM-dd"))
games_df_processed.select("release_date").show(5)

+------------+
|release_date|
+------------+
|  1999-04-01|
|  2010-07-12|
|  2006-05-01|
|  2007-10-10|
|  2013-07-09|
+------------+
only showing top 5 rows

+-------------------+
|       release_date|
+-------------------+
|1999-04-01 00:00:00|
|2010-07-12 00:00:00|
|2006-05-01 00:00:00|
|2007-10-10 00:00:00|
|2013-07-09 00:00:00|
+-------------------+
only showing top 5 rows



In [28]:
# Verify that appid's match between the two datasets
joined = reviews_df_processed_metadata.select("appid", "game").alias("df1").join(
    games_df_processed.select("appid", "name").alias("df2"), on="appid", how="inner").filter(
    "df1.game != df2.name")
joined.show()

+-------+--------------------+--------------------+
|  appid|                game|                name|
+-------+--------------------+--------------------+
|1172380|STAR WARS Jedi: F...|STAR WARS Jedi: F...|
|1172620|      Sea of Thieves|Sea of Thieves: 2...|
|1172470|        Apex Legends|       Apex Legends™|
|1481400|               Dagon|Dagon: by H. P. L...|
| 289070|Sid Meier's Civil...|Sid Meier’s Civil...|
| 271590|  Grand Theft Auto V|Grand Theft Auto ...|
|1172470|        Apex Legends|       Apex Legends™|
| 359550|Tom Clancy's Rain...|Tom Clancy's Rain...|
| 391220|Rise of the Tomb ...|Rise of the Tomb ...|
| 311210|Call of Duty: Bla...|Call of Duty®: Bl...|
|1238860|     Battlefield 4™ |      Battlefield 4™|
| 252950|       Rocket League|      Rocket League®|
|1313860| EA SPORTS™ FIFA 21 |  EA SPORTS™ FIFA 21|
|1222680|Need for Speed™ H...|Need for Speed™ Heat|
|1481400|               Dagon|Dagon: by H. P. L...|
| 698780|Doki Doki Literat...|Doki Doki Literat...|
| 563840|   

In [29]:
# Dropping the game name from the first dataset (100+ Million Reviews), will use the game name from the second dataset (Steam Games)
reviews_df_processed_metadata = reviews_df_processed_metadata.drop("game")
reviews_df_processed_metadata.printSchema()

root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: timestamp (nullable = true)
 |-- language: string (nullable = true)
 |-- timestamp_created: timestamp (nullable = true)
 |-- timestamp_updated: timestamp (nullable = true)
 |-- votes_up: integer (nullable = true)
 |-- votes_funny: integer (nullable = true)
 |-- weighted_vote_score: double (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- received_for_free: boolean (nullable = true)
 |-- written_during_early_access: boolean (nullable = true)



In [None]:
# Save processed dataframes as parquet files
# reviews_df_processed.write.mode("overwrite").parquet("home/joneel/joneel/Group_Project/reviews_processed")
# games_df_processed.write.mode("overwrite").parquet("home/joneel/joneel/Group_Project/games_processed")

In [None]:
"Tokenize reviews, see # of reviews per author, break down tags and genres"
"Make some plots to see data distributions (# of reviews, reviews per author, dates, best and worst reviewed games?)"