In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# **read stream**

In [0]:
my_schema = """order_id STRING,
        timestamp STRING,
        customer STRUCT<
          customer_id: INT,
          name: STRING,
          email: STRING,
          address: STRUCT<
            city: STRING,
            postal_code: STRING,
            country: STRING
          >
        >,
        items ARRAY<STRUCT<
          item_id: STRING,
          product_name: STRING,
          quantity: INT,
          price: DOUBLE
        >>,
        payment STRUCT<
          method: STRING,
          transaction_id: STRING
        >,
        metadata ARRAY<STRUCT<
          key: STRING,
          value: STRING
        >>"""

In [0]:
df = spark.readStream.format('json')\
        .option('multiline', 'true')\
        .schema(my_schema)\
        .load('/Volumes/parent_cata/streaming/stream/datasource/')

In [0]:
df_flat = df.select(
    'order_id',
    'timestamp',
    'customer.customer_id',
    'customer.name',
    'customer.email',
    'customer.address.city',
    'customer.address.country',
    'customer.address.postal_code',
    'payment.method',
    'payment.transaction_id',
    'items',
    'metadata'
)



df_items = df_flat.withColumn('items',explode('items'))

df_items = df_items.select(
    'order_id',
    'timestamp',
    'customer_id',
    'name',
    'email',
    'city',
    'country',
    'postal_code',
    'method',
    'transaction_id',
    'items.item_id',
    'items.price',
    'items.product_name',
    'items.quantity',
    'metadata'
)



df_final = df_items.withColumn("meta", explode(col("metadata"))).select(
    "*",
    col("meta.key").alias("meta_key"),
    col("meta.value").alias("meta_value")
).drop("meta", "metadata")

In [0]:
df_final.writeStream.format('delta')\
    .outputMode('append')\
    .trigger(once=True)\
    .option('checkpointLocation', '/Volumes/parent_cata/streaming/stream/checkpoint')\
    .option('path','/Volumes/parent_cata/streaming/stream/streamsink/')\
    .start()


In [0]:
%sql
select * from delta.`/Volumes/parent_cata/streaming/stream/streamsink/`

In [0]:
%sql
select * from delta.`/Volumes/parent_cata/streaming/stream/streamsink/`