In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, when

# spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder \
    .master("local[1]") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()


In [5]:
import pyspark.sql.functions as f

In [6]:
df = spark.read.json([
    "/home/jovyan/data/test.jsonl",
    "/home/jovyan/data/test2.jsonl"
])
df.select('steam_appid').distinct().count()

223874

In [7]:
df = df.distinct()
df.select('steam_appid').distinct().count()

223874

In [8]:
df.show(5)

+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+------------+--------------------+--------------------+--------------------+----------+-----------------------+--------------------+--------------------+--------------------+-------+--------------------+------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+----+--------------------+
|      about_the_game|        achievements|alternate_appid|          background|      background_raw|       capsule_image|     capsule_imagev5|          categories|co

In [9]:
df.printSchema()

root
 |-- about_the_game: string (nullable = true)
 |-- achievements: struct (nullable = true)
 |    |-- highlighted: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- path: string (nullable = true)
 |    |-- total: long (nullable = true)
 |-- alternate_appid: string (nullable = true)
 |-- background: string (nullable = true)
 |-- background_raw: string (nullable = true)
 |-- capsule_image: string (nullable = true)
 |-- capsule_imagev5: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- content_descriptors: struct (nullable = true)
 |    |-- ids: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- notes: string (nullable = true)
 |-- controller_support: string (nullable = true)
 |-- demos: array (

In [10]:
df.describe().show()


+-------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------------+--------------------------+--------------------+---------------------+--------------------+--------------------+--------+--------------------+--------------------+------------------+------------------------+--------------------+------------------+--------------------+-----------+--------------------+
|summary|      about_the_game|   alternate_appid|          background|      background_raw|       capsule_image|     capsule_imagev5|controller_support|detailed_description|              drm_notice|   ext_user_account_notice|        header_image|         legal_notice|  linux_requirements|    mac_requirements|    name|     pc_requirements|             ratings|      required_age|                 reviews|   short_description|       steam_appid| supported_languages|       type|             website|


# Remove duplicates

In [11]:
df.groupBy("steam_appid").agg(f.count("*")).filter(col('count(1)')>1).show()

+-----------+--------+
|steam_appid|count(1)|
+-----------+--------+
|     412020|       2|
|        400|       2|
|      10180|       2|
|      33230|       2|
|      32460|       2|
|     244450|       2|
|      38480|       2|
|     311210|       3|
|    2195250|       3|
|      22330|       2|
|    2669320|       2|
|      34330|       2|
|     238210|       2|
|     359550|       2|
|        620|       2|
|      42680|       2|
|    1426210|       2|
+-----------+--------+



In [12]:
def find_differences_by_id(df, id_col):
    dup_ids = df.groupBy(id_col).count().filter(col("count") > 1).select(id_col)
    dup_df = df.join(dup_ids, on=id_col, how="inner")

    cols_to_check = [c for c in df.columns if c != id_col]

    diff_exprs = [
        f.collect_set(col(c)).alias(c) for c in cols_to_check
    ]

    grouped = dup_df.groupBy(id_col).agg(*diff_exprs)

    mismatch_exprs = [
        (f.size(col(c)) > 1).alias(c) for c in cols_to_check
    ]

    mismatches = grouped.select(id_col, *mismatch_exprs)

    mismatch_long = mismatches.selectExpr(
        f"`{id_col}`", 
        "stack(" + str(len(cols_to_check)) + ", " +
        ", ".join([f"'{c}', {c}" for c in cols_to_check]) +
        ") as (column, has_difference)"
    ).filter("has_difference = true")

    return mismatch_long


In [13]:
result = find_differences_by_id(df, "steam_appid")
result.show(100)

+-----------+--------------------+--------------+
|steam_appid|              column|has_difference|
+-----------+--------------------+--------------+
|      32460|      about_the_game|          true|
|      32460|          background|          true|
|      32460|      background_raw|          true|
|      32460|       capsule_image|          true|
|      32460|     capsule_imagev5|          true|
|      32460|          categories|          true|
|      32460|detailed_description|          true|
|      32460|        header_image|          true|
|      32460|                name|          true|
|      32460|      package_groups|          true|
|      32460|     pc_requirements|          true|
|      32460|             ratings|          true|
|      32460|         screenshots|          true|
|      32460|   short_description|          true|
|      32460|        support_info|          true|
|      32460| supported_languages|          true|
|      32460|                type|          true|


In [14]:
df.createOrReplaceTempView("df")


In [15]:
spark.sql("""
SELECT recommendations, ROW_NUMBER() OVER (PARTITION BY steam_appid ORDER BY recommendations DESC) AS rank FROM df WHERE steam_appid = '34330'
""").show()

+---------------+----+
|recommendations|rank|
+---------------+----+
|        {35065}|   1|
|        {35062}|   2|
+---------------+----+



In [16]:
df = spark.sql("""
SELECT *
FROM
(SELECT *, ROW_NUMBER() OVER (PARTITION BY steam_appid ORDER BY recommendations DESC) AS rank FROM df)
WHERE rank = 1
""").drop("rank")

In [17]:
result = find_differences_by_id(df, "steam_appid")
result.show(5)

+-----------+------+--------------+
|steam_appid|column|has_difference|
+-----------+------+--------------+
+-----------+------+--------------+



In [18]:
df.count()

223874

In [30]:
df.groupBy("steam_appid").agg(f.count("*")).filter(col('count(1)')>1).show()

+-----------+--------+
|steam_appid|count(1)|
+-----------+--------+
+-----------+--------+



# Clean columns

In [19]:
def get_null_counts(df):
    null_counts = df.select([
        _sum(f.when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in df.columns
    ])
    
    result = null_counts.selectExpr(
        "stack({0}, {1}) as (column_name, null_count)".format(
            len(df.columns),
            ", ".join([f"'{c}', `{c}`" for c in df.columns])
        )
    )
    
    return result.orderBy(f.desc("null_count"))


In [20]:
null_summary = get_null_counts(df)
null_summary = null_summary.withColumn("percent", col("null_count")/df.count())
null_summary.show(100)

+--------------------+----------+--------------------+
|         column_name|null_count|             percent|
+--------------------+----------+--------------------+
|     alternate_appid|    223860|  0.9999374648239635|
|          drm_notice|    222669|  0.9946175080625709|
|ext_user_account_...|    221832|  0.9908787978952447|
|          metacritic|    218735|  0.9770451235963087|
|             reviews|    210986|  0.9424319036600945|
|                 dlc|    207705|  0.9277763384761071|
|               demos|    204100|  0.9116735306467031|
|     recommendations|    202386|  0.9040174383805176|
|  controller_support|    168197|  0.7513020717010461|
|        achievements|    166183|  0.7423059399483638|
|        legal_notice|    153354|  0.6850013847074694|
|            fullgame|    141271|  0.6310290609896638|
|             website|    121069|  0.5407908019689647|
|              movies|     86407|  0.3859626396991165|
|      price_overview|     86339|  0.3856588974155105|
|         

In [28]:
null_columns = null_summary.filter('percent >=0.8').select('column_name').rdd.flatMap(lambda row:row).collect()

In [29]:
df = df.drop(*null_columns)

In [31]:
df.printSchema()

root
 |-- about_the_game: string (nullable = true)
 |-- achievements: struct (nullable = true)
 |    |-- highlighted: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- path: string (nullable = true)
 |    |-- total: long (nullable = true)
 |-- background: string (nullable = true)
 |-- background_raw: string (nullable = true)
 |-- capsule_image: string (nullable = true)
 |-- capsule_imagev5: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- content_descriptors: struct (nullable = true)
 |    |-- ids: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- notes: string (nullable = true)
 |-- controller_support: string (nullable = true)
 |-- detailed_description: string (nullable = true)
 |-- developer

In [None]:
df.select('achievements.total').show()

In [None]:
df.groupBy("type").agg(f.count("*")).filter(col('count(1)')>1).show()

In [22]:
df.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (13)
+- Project (12)
   +- Filter (11)
      +- Window (10)
         +- WindowGroupLimit (9)
            +- Sort (8)
               +- Exchange (7)
                  +- WindowGroupLimit (6)
                     +- Sort (5)
                        +- HashAggregate (4)
                           +- Exchange (3)
                              +- HashAggregate (2)
                                 +- Scan json  (1)


(1) Scan json 
Output [44]: [about_the_game#8, achievements#9, alternate_appid#10, background#11, background_raw#12, capsule_image#13, capsule_imagev5#14, categories#15, content_descriptors#16, controller_support#17, demos#18, detailed_description#19, developers#20, dlc#21, drm_notice#22, ext_user_account_notice#23, fullgame#24, genres#25, header_image#26, is_free#27, legal_notice#28, linux_requirements#29, mac_requirements#30, metacritic#31, movies#32, name#33, package_groups#34, packages#35, pc_requirements#36, platforms#37, price_overview