In [0]:
masterdata_df = spark.read\
    .format("csv")\
    .option("header", True)\
    .load("/Volumes/workspace/default/rawdata/steam/processed/masterdata_exports/masterdata/")

In [0]:
from pyspark.sql.functions import col

filtered_master_df = (
    masterdata_df
    .filter(col("type") == "game")
    .filter(col("release_date").isNotNull())
)


In [0]:
print("Original masterdata rows:", masterdata_df.count())
print("After filtering (games + release_date):", filtered_master_df.count())


Original masterdata rows: 239664
After filtering (games + release_date): 115796


In [0]:
filtered_master_df.select("type").distinct().show()


+----+
|type|
+----+
|game|
+----+



In [0]:
filtered_master_df.select("release_date").summary("min", "max").show()


+-------+------------+
|summary|release_date|
+-------+------------+
|    min|  1969-12-31|
|    max|  9998-12-31|
+-------+------------+



In [0]:
from pyspark.sql.functions import year

with_year_df = (
    filtered_master_df
    .withColumn("release_year", year("release_date"))
)


In [0]:
with_year_df.select("release_year").summary("min", "max").show()


+-------+------------+
|summary|release_year|
+-------+------------+
|    min|        1969|
|    max|        9998|
+-------+------------+



In [0]:
from pyspark.sql.functions import current_date

current_year = year(current_date())

clean_year_df = (
    with_year_df
    .filter((col("release_year") >= 1995) & (col("release_year") <= current_year))
)


In [0]:
print("Rows before year cleanup:", with_year_df.count())
print("Rows after year cleanup:", clean_year_df.count())


Rows before year cleanup: 115796
Rows after year cleanup: 115745


In [0]:
clean_year_df.select("release_year").summary("min", "max").show()


+-------+------------+
|summary|release_year|
+-------+------------+
|    min|        1997|
|    max|        2026|
+-------+------------+



In [0]:
clean_year_df.groupBy("release_year").count().orderBy("release_year").show(10)


+------------+-----+
|release_year|count|
+------------+-----+
|        1997|    2|
|        1998|    1|
|        1999|    2|
|        2000|    2|
|        2001|    4|
|        2002|    1|
|        2003|    3|
|        2004|    5|
|        2005|    7|
|        2006|   63|
+------------+-----+
only showing top 10 rows


In [0]:
clean_year_df.groupBy("release_year", "is_free").count() \
    .orderBy("release_year", "is_free") \
    .show(20)


+------------+-------+-----+
|release_year|is_free|count|
+------------+-------+-----+
|        1997|  false|    1|
|        1997|   true|    1|
|        1998|  false|    1|
|        1999|  false|    2|
|        2000|  false|    2|
|        2001|  false|    4|
|        2002|  false|    1|
|        2003|  false|    3|
|        2004|  false|    5|
|        2005|  false|    7|
|        2006|  false|   62|
|        2006|   true|    1|
|        2007|  false|   82|
|        2007|   true|    2|
|        2008|  false|  148|
|        2008|   true|    3|
|        2009|  false|  300|
|        2009|   true|    7|
|        2010|  false|  233|
|        2010|   true|    7|
+------------+-------+-----+
only showing top 20 rows


In [0]:
from pyspark.sql.functions import count

stratum_counts_df = (
    clean_year_df
    .groupBy("release_year", "is_free")
    .agg(count("*").alias("stratum_count"))
)


In [0]:
with_stratum_df = (
    clean_year_df
    .join(
        stratum_counts_df,
        on=["release_year", "is_free"],
        how="left"
    )
)


In [0]:
with_stratum_df.select("stratum_count").summary("min", "50%", "90%", "max").show()


+-------+-------------+
|summary|stratum_count|
+-------+-------------+
|    min|            1|
|    50%|         9520|
|    90%|        17081|
|    max|        17081|
+-------+-------------+



In [0]:
display(
    with_stratum_df
    .select("appid", "release_year", "is_free", "stratum_count")
    .limit(10)
)


appid,release_year,is_free,stratum_count
10,2000,False,2
20,1999,False,2
30,2003,False,3
40,2001,False,4
50,1999,False,2
60,2000,False,2
70,1998,False,1
80,2004,False,5
130,2001,False,4
220,2004,False,5


In [0]:
from pyspark.sql.functions import rand

sampled_df = (
    with_stratum_df
    .withColumn(
        "keep_row",
        (with_stratum_df.stratum_count < 200) |
        (rand() < 0.20)
    )
    .filter("keep_row = true")
    .drop("keep_row", "stratum_count")
)


In [0]:
print("Final sampled rows:", sampled_df.count())


Final sampled rows: 23582


In [0]:
sampled_df.groupBy("release_year").count() \
    .orderBy("release_year") \
    .show(20)


+------------+-----+
|release_year|count|
+------------+-----+
|        1997|    2|
|        1998|    1|
|        1999|    2|
|        2000|    2|
|        2001|    4|
|        2002|    1|
|        2003|    3|
|        2004|    5|
|        2005|    7|
|        2006|   63|
|        2007|   84|
|        2008|  151|
|        2009|   62|
|        2010|   55|
|        2011|   59|
|        2012|   93|
|        2013|  124|
|        2014|  364|
|        2015|  523|
|        2016|  832|
+------------+-----+
only showing top 20 rows


In [0]:
sampled_df.groupBy("is_free").count().show()


+-------+-----+
|is_free|count|
+-------+-----+
|  false|20217|
|   true| 3365|
+-------+-----+



In [0]:
display(
    sampled_df.select(
        "appid", "name", "release_year", "is_free",
        "genres", "platforms"
    ).limit(10)
)


appid,name,release_year,is_free,genres,platforms
10,Counter-Strike,2000,False,Action,"windows, linux, mac"
20,Team Fortress Classic,1999,False,Action,"linux, windows, mac"
30,Day of Defeat,2003,False,Action,"linux, windows, mac"
40,Deathmatch Classic,2001,False,Action,"linux, windows, mac"
50,Half-Life: Opposing Force,1999,False,Action,"windows, linux, mac"
60,Ricochet,2000,False,Action,"mac, linux, windows"
70,Half-Life,1998,False,Action,"linux, windows, mac"
80,Counter-Strike: Condition Zero,2004,False,Action,"mac, windows, linux"
130,Half-Life: Blue Shift,2001,False,Action,"windows, mac, linux"
220,Half-Life 2,2004,False,Action,"linux, windows"


In [0]:
sampled_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/workspace/default/rawdata/steam/processed/visualization_sample/")
