### Read JSON Data (Batch)

In [0]:
from pyspark.sql.functions import col, explode, explode_outer

In [0]:
df = (
  spark.read
  .format("json")
  .option("inferSchema", True)
  .option("multiline", True)
  .load("/Volumes/workspace/stream/json_files/input/day1.json")
)

In [0]:
df.show(truncate=True)

In [0]:
df.printSchema()

In [0]:
flatten_df = (
    df.select(
        col("customer.customer_id").alias("customer_id"),
        col("customer.email").alias("email"),
        col("customer.name").alias("name"),
        col("customer.address.city").alias("city"),
        col("customer.address.country").alias("country"),
        col("customer.address.postal_code").alias("postal_code"),
        col("order_id"),
        col("payment.method").alias("payment_method"),
        col("payment.transaction_id").alias("payment_txn_id"),
        col("timestamp").alias("event_ts"),
        explode_outer(col("items")).alias("item"),
        explode_outer(col("metadata")).alias("metadata")
    )
    .select(
        "customer_id", "email", "name", "city", "country", "postal_code",
        "order_id", "payment_method", "payment_txn_id", "event_ts",
        col("item.item_id").alias("item_id"),
        col("item.price").alias("price"),
        col("item.product_name").alias("product_name"),
        col("metadata.key").alias("meta_key"),
        col("metadata.value").alias("meta_value")
    )
)

flatten_df.printSchema()

flatten_df.display()

### Read Streaming Data

In [0]:
schema = """
    customer STRUCT<
        address: STRUCT<
            city: STRING,
            country: STRING,
            postal_code: STRING
        >,
        customer_id: LONG,
        email: STRING,
        name: STRING
    >,
    items ARRAY<STRUCT<
            item_id: STRING,
            price: DOUBLE,
            product_name: STRING,
            quantity: LONG
    >>,
    metadata ARRAY<STRUCT<
        key: STRING,
        value: STRING
    >>,
    order_id STRING,
    payment STRUCT<
        method: STRING,
        transaction_id: STRING
    >,
    timestamp STRING
"""

In [0]:

stream_df = (
    spark.readStream
    .format("json")
    .option("multiline", "true")
    .schema(schema)
    .load("/Volumes/workspace/stream/json_files/input/day1.json")
)

stream_df.printSchema()

In [0]:
flatten_stream_df = (
    stream_df.select(
        col("customer.customer_id").alias("customer_id"),
        col("customer.email").alias("email"),
        col("customer.name").alias("name"),
        col("customer.address.city").alias("city"),
        col("customer.address.country").alias("country"),
        col("customer.address.postal_code").alias("postal_code"),
        col("order_id"),
        col("payment.method").alias("payment_method"),
        col("payment.transaction_id").alias("payment_txn_id"),
        col("timestamp").alias("event_ts"),
        explode_outer(col("items")).alias("item"),
        explode_outer(col("metadata")).alias("metadata")
    )
    .select(
        "customer_id", "email", "name", "city", "country", "postal_code",
        "order_id", "payment_method", "payment_txn_id", "event_ts",
        col("item.item_id").alias("item_id"),
        col("item.price").alias("price"),
        col("item.product_name").alias("product_name"),
        col("metadata.key").alias("meta_key"),
        col("metadata.value").alias("meta_value")
    )
)

flatten_stream_df.printSchema()

In [0]:
(
    flatten_stream_df.writeStream
    .format("delta")
    .outputMode("append")
    .trigger(once=True)
    .option("path", "/Volumes/workspace/stream/json_files/output/data")
    .option("checkpointLocation", "/Volumes/workspace/stream/json_files/output/checkpoint")
    .start()
)

In [0]:
""