In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = (
    SparkSession.builder
    .appName("SteamSparkProject")
    .master("local[*]")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")


In [10]:
from pyspark.sql.functions import *

REVIEWS_PATH = "data/reviews_sample.csv" # Smaller Reviews for GitHub
# REVIEWS_PATH = "data/reviews.csv"      # Use for full data

APPLICATION_PATH = "data/applications_sample.csv" # Smaller Applications Data for GitHub
# APPLICATION_PATH = "data/applications.csv"      # Full application data

reviews = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(REVIEWS_PATH)
)

apps = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(APPLICATION_PATH)
)

df = apps.join(reviews, on="appid", how="inner")

df_clean = (
    df
    .withColumn("final_price", expr("try_cast(mat_final_price as double)"))
    .withColumn("release_year", year(to_date(col("release_date"), "yyyy-MM-dd")))
    .withColumn("voted_up_real", expr("try_cast(voted_up as boolean)"))
)

df_filter = df_clean.filter(
    (col("type") == "game") &
    col("final_price").isNotNull() &
    (col("final_price") >= 5000) &
    (col("release_year") >= 2010)
)

df_not_null = df_filter.filter(col("voted_up_real").isNotNull())

df_answer = df_not_null.agg(
    (sum(when(col("voted_up_real") == True, 1).otherwise(0)) / count("*"))
    .alias("positive_review_ratio")
)

df_answer.show()


+---------------------+
|positive_review_ratio|
+---------------------+
|                 0.64|
+---------------------+



In [11]:
df_not_null.groupBy("voted_up_real").count().show()

+-------------+-----+
|voted_up_real|count|
+-------------+-----+
|         true|   16|
|        false|    9|
+-------------+-----+



In [12]:
from pyspark.sql.functions import *

reviews = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(REVIEWS_PATH)
)

apps = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(APPLICATION_PATH)
)

plat = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load("data/platforms.csv")
)

plat_help = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load("data/application_platforms.csv")
)

plat_clean = (
    plat.withColumnRenamed("id", "platform_id")
        .withColumnRenamed("name", "platform_name")
)

df_plat = plat_clean.join(plat_help, on="platform_id", how="inner")

df = apps.join(reviews, on="appid", how="inner")

df_with_plat = df.join(df_plat, on="appid", how="inner")

df_clean = (
    df_with_plat
    .withColumn("helpfulness_score", expr("try_cast(weighted_vote_score as double)"))
    .withColumn(
        "steam_achievements",
        floor(expr("try_cast(mat_achievement_count as double)")).cast("int")
    )
)

df_not_null = df_clean.filter(
    col("steam_achievements").isNotNull() &
    (col("helpfulness_score") <= 1)
)

achievement_groups = df_not_null.withColumn(
    "ach_group",
    when(col("steam_achievements") <= 15, "low (1-15)")
    .when(col("steam_achievements") <= 45, "medium (16-45)")
    .otherwise("high (45+)")
)

ach_plat_score = (
    achievement_groups
    .groupBy("platform_name", "ach_group")
    .agg(avg("helpfulness_score").alias("avg_helpfulness"))
    .orderBy("platform_name", "avg_helpfulness")
)

ach_plat_score.show()



+-------------+--------------+-------------------+
|platform_name|     ach_group|    avg_helpfulness|
+-------------+--------------+-------------------+
|        linux|    high (45+)|0.48017899302752287|
|        linux|    low (1-15)|        0.491050592|
|        linux|medium (16-45)|0.49426709289156634|
|          mac|    high (45+)|0.47936313406249986|
|          mac|    low (1-15)|0.49545939140845086|
|          mac|medium (16-45)|0.49692533463320465|
|      windows|    high (45+)|0.49075825597864775|
|      windows|    low (1-15)| 0.4931064187777776|
|      windows|medium (16-45)| 0.4988270143589746|
+-------------+--------------+-------------------+



In [13]:
from pyspark.sql.functions import *

reviews = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(REVIEWS_PATH)
)

apps = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(APPLICATION_PATH)
)

df = apps.join(reviews, on="appid", how="inner")

free = df.filter(col("received_for_free") == "True")

free_clean = (
    free
    .withColumn("voted_up_real", expr("try_cast(voted_up as boolean)"))
    .withColumn("playtime_at_review",
                floor(expr("try_cast(author_playtime_at_review as double)")).cast("int"))
)

free_clean_no_null = (
    free_clean
    .filter(col("voted_up_real").isNotNull())
    .filter(col("playtime_at_review").isNotNull())
)

games = (
    free_clean_no_null
    .groupBy("appid", "name")
    .agg(
        count("*").alias("total_free_reviews"),
        sum(when(col("voted_up_real") == True, 1).otherwise(0)).alias("number_positive"),
        avg("playtime_at_review").alias("avg_playtime (minutes)")
    )
)

ratio = (
    games
    .withColumn("positive_ratio", col("number_positive") / col("total_free_reviews"))
    .orderBy(col("total_free_reviews").desc())
)

overall_free = free_clean_no_null.agg(
    count("*").alias("total_free_reviews"),
    sum(when(col("voted_up_real") == True, 1).otherwise(0)).alias("number_positive"),
    avg("playtime_at_review").alias("avg_playtime (minutes)")
)

overall_free_ratio = overall_free.withColumn(
    "overall_positive_ratio",
    col("number_positive") / col("total_free_reviews")
)

not_free = df.filter(col("received_for_free") == "False")

not_free_clean = (
    not_free
    .withColumn("voted_up_real", expr("try_cast(voted_up as boolean)"))
    .withColumn("playtime_at_review",
                floor(expr("try_cast(author_playtime_at_review as double)")).cast("int"))
)

not_free_clean_no_null = (
    not_free_clean
    .filter(col("voted_up_real").isNotNull())
    .filter(col("playtime_at_review").isNotNull())
)

not_overall_free = not_free_clean_no_null.agg(
    count("*").alias("total_not_free_reviews"),
    sum(when(col("voted_up_real") == True, 1).otherwise(0)).alias("number_positive"),
    avg("playtime_at_review").alias("avg_playtime (minutes)")
)

not_overall_free_ratio = not_overall_free.withColumn(
    "overall_positive_ratio",
    col("number_positive") / col("total_not_free_reviews")
)

not_overall_free_ratio.show()

overall_free_ratio.show()

ratio.show(5)


+----------------------+---------------+----------------------+----------------------+
|total_not_free_reviews|number_positive|avg_playtime (minutes)|overall_positive_ratio|
+----------------------+---------------+----------------------+----------------------+
|                  2415|           1630|    1415.9590062111802|    0.6749482401656315|
+----------------------+---------------+----------------------+----------------------+

+------------------+---------------+----------------------+----------------------+
|total_free_reviews|number_positive|avg_playtime (minutes)|overall_positive_ratio|
+------------------+---------------+----------------------+----------------------+
|                16|             14|             1679.9375|                 0.875|
+------------------+---------------+----------------------+----------------------+

+------+--------------------+------------------+---------------+----------------------+--------------+
| appid|                name|total_free_revie

In [14]:
from pyspark.sql.functions import *

reviews = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(REVIEWS_PATH)
)

apps = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(APPLICATION_PATH)
)

apps_clean = apps.filter(col("type") == "game").filter(col("release_date").isNotNull())

df = apps_clean.join(reviews, on="appid", how="inner")

df_clean = (
    df
    .withColumn("playtime", floor(expr("try_cast(author_playtime_at_review as double)")).cast("bigint"))
    .withColumn("votes_up_real", floor(expr("try_cast(votes_up as double)")).cast("bigint"))
    .withColumn("votes_funny_real", floor(expr("try_cast(votes_funny as double)")).cast("bigint"))
    .withColumn("release_year", year(to_date(col("release_date"), "yyyy-MM-dd")))
)

filtered = (
    df_clean
    .filter(col("playtime").isNotNull())
    .filter(col("votes_up_real").isNotNull())
    .filter(col("votes_funny_real").isNotNull())
    .filter(col("votes_up_real") < 10000)
    .filter(col("votes_funny_real") < 10000)
)

playtime_groups = filtered.withColumn(
    "playtime_group",
    when(col("playtime") < 60, "0-1 hours")
    .when(col("playtime") < 300, "1-5 hours")
    .when(col("playtime") < 600, "5-10 hours")
    .when(col("playtime") < 1200, "10-20 hours")
    .otherwise("over 20 hours")
)

year_groups = playtime_groups.withColumn(
    "year_group",
    when(col("release_year") < 2005, "Pre 2005")
    .when(col("release_year") <= 2015, "2005-2015")
    .otherwise("2016 and after")
)

df_agg = (
    year_groups
    .groupBy("playtime_group", "year_group")
    .agg(
        round(avg("votes_up_real"), 2).alias("avg_votes_up"),
        round(avg("votes_funny_real"), 2).alias("avg_funny_votes"),
        count("*").alias("num_reviews")
    )
    .orderBy("year_group", "playtime_group")
)

df_agg.show()


+--------------+--------------+------------+---------------+-----------+
|playtime_group|    year_group|avg_votes_up|avg_funny_votes|num_reviews|
+--------------+--------------+------------+---------------+-----------+
|     0-1 hours|     2005-2015|        7.38|           0.43|        484|
|     1-5 hours|     2005-2015|        4.28|           0.25|        565|
|   10-20 hours|     2005-2015|         5.8|           0.17|        132|
|    5-10 hours|     2005-2015|        3.81|           0.16|        207|
| over 20 hours|     2005-2015|        8.19|           1.31|        135|
|     0-1 hours|2016 and after|         1.0|            0.2|        225|
|     1-5 hours|2016 and after|        1.58|            0.2|        245|
|   10-20 hours|2016 and after|        1.06|           0.16|         68|
|    5-10 hours|2016 and after|        1.31|           0.16|         99|
| over 20 hours|2016 and after|         1.3|           0.03|         86|
|   10-20 hours|      Pre 2005|         0.0|       