# Exploring JSON options in Spark

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("demo") \
    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

## Comparing auto-inferred schemas vs explicit / manual entry of schema

In [2]:
inferred_df = spark.read.json("clickstream.json")
inferred_df.printSchema()

root
 |-- browser_id: string (nullable = true)
 |-- item_skus: string (nullable = true)



In [3]:
inferred_df.show(10, truncate = False)

+-----------------+----------------------------+
|browser_id       |item_skus                   |
+-----------------+----------------------------+
|e0849a8e34f825496|item_1,item_2,item_3        |
|82f7694c1b1afbb28|["item_1","item_2","item_3"]|
+-----------------+----------------------------+



In [4]:
manual_df = spark.read.json("clickstream.json", 
    T.StructType([
        T.StructField("browser_id", T.StringType()), 
        T.StructField("item_skus", T.ArrayType(T.StringType()))
    ]
))

manual_df.printSchema()

root
 |-- browser_id: string (nullable = true)
 |-- item_skus: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [5]:
manual_df.show(10, truncate = False)

+-----------------+------------------------+
|browser_id       |item_skus               |
+-----------------+------------------------+
|e0849a8e34f825496|null                    |
|82f7694c1b1afbb28|[item_1, item_2, item_3]|
+-----------------+------------------------+



## UDF For Profiling JSON

In [6]:
import json

import pyspark.sql.types as T
import pyspark.sql.functions as F

def append_type(out, prefix, type):
    prev_type_list = out.get(prefix)
    if prev_type_list:
        if type not in prev_type_list:
            prev_type_list.append(type)
    else:
        out[prefix] = [type]

def create_types(x, prefix, out):
    if type(x) is dict:
        for a in x:
            descended_prefix = prefix + "{}." + str(a)
            create_types(x[a], descended_prefix, out)
    elif type(x) is list:
        for a in x:
            create_types(a, prefix + "[]", out)
    elif type(x) is str:
        append_type(out, prefix, "str")
    elif type(x) is int:
        append_type(out, prefix, "int")
    elif type(x) is float:
        append_type(out, prefix, "float")
    elif x is True or x is False:
        append_type(out, prefix, "bool")
    elif x is None:
        append_type(out, prefix, "null")
    else:
        append_type(out, prefix, "unknown type: " + str(type(x)))
    return out

@F.udf(returnType=T.ArrayType(
    T.StructType([
        T.StructField("field_path", T.StringType()), 
        T.StructField("field_type", T.StringType())
    ])))
def profile_json(obj_str, prefix):
    if obj_str:
        type_map = create_types(json.loads(obj_str), prefix, {})
        rtn = []
        for field_path in type_map:
            for field_type in type_map[field_path]:
                rtn.append((field_path, field_type))
        return rtn

def json_profile(df, json_col):
    return df \
        .withColumn("profile", F.explode(profile_json(json_col, F.lit("json_data")))) \
        .select(
            F.col("profile.field_path").alias("field_path"),
            F.col("profile.field_type").alias("field_type")
        ) \
        .groupBy("field_path") \
        .agg(
            F.collect_set("field_type").alias("field_types"),
            F.count("*").alias("count"),
        )
        

In [7]:
plain_line_df = spark.read.text("clickstream.json")
json_profile(plain_line_df, plain_line_df.value).show(10, truncate = False)

+-----------------------+-----------+-----+
|field_path             |field_types|count|
+-----------------------+-----------+-----+
|json_data{}.browser_id |[str]      |2    |
|json_data{}.item_skus  |[str]      |1    |
|json_data{}.item_skus[]|[str]      |1    |
+-----------------------+-----------+-----+



## A slightly more comprehensive example

In [8]:
def json_profile_w_ts(df, json_col):
    return df \
        .withColumn("profile", F.explode(profile_json(json_col, F.lit("json_data")))) \
        .select(
            F.col("profile.field_path").alias("field_path"),
            F.col("profile.field_type").alias("field_type"),
            F.get_json_object("value", "$.event_ts").cast(T.TimestampType()).alias("event_ts")
        ) \
        .groupBy("field_path", "field_type") \
        .agg(
            F.min("event_ts").alias("first_seen"), 
            F.max("event_ts").alias("last_seen"),
            F.count("*").alias("count"),
        )

In [9]:
plain_line_df = spark.read.text("clickstream_full.json")
json_profile_w_ts(plain_line_df, plain_line_df.value).orderBy("field_path").show(10, truncate = False)

+-------------------------------+----------+-------------------+-------------------+-----+
|field_path                     |field_type|first_seen         |last_seen          |count|
+-------------------------------+----------+-------------------+-------------------+-----+
|json_data{}.device{}.browser_id|int       |2020-01-01 08:30:39|2020-06-01 20:46:04|317  |
|json_data{}.device{}.browser_id|str       |2020-06-02 07:45:29|2021-05-26 09:15:58|683  |
|json_data{}.device{}.ip        |str       |2020-01-01 08:30:39|2020-12-20 22:22:58|700  |
|json_data{}.device{}.user_agent|str       |2020-01-01 08:30:39|2021-05-26 09:15:58|1000 |
|json_data{}.event              |str       |2020-01-01 08:30:39|2021-05-26 09:15:58|1000 |
|json_data{}.event_ts           |str       |2020-01-01 08:30:39|2021-05-26 09:15:58|1000 |
+-------------------------------+----------+-------------------+-------------------+-----+

