In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### **Read Streaming Data**

In [0]:
my_schema = """order_id STRING,
        timestamp STRING,
        customer STRUCT<
          customer_id: INT,
          name: STRING,
          email: STRING,
          address: STRUCT<
            city: STRING,
            postal_code: STRING,
            country: STRING
          >
        >,
        items ARRAY<STRUCT<
          item_id: STRING,
          product_name: STRING,
          quantity: INT,
          price: DOUBLE
        >>,
        payment STRUCT<
          method: STRING,
          transaction_id: STRING
        >,
        metadata ARRAY<STRUCT<
          key: STRING,
          value: STRING
        >>"""

In [0]:
df = spark.readStream.format("json")\
          .option("multiLine",True)\
          .schema(my_schema)\
          .load("/Volumes/workspace/stream/streaming/jsonsource")


# Transformations
df = df.select("items","order_id","timestamp","customer.customer_id","customer.name","customer.email","customer.address.city","customer.address.country","customer.address.postal_code","payment","metadata")

df = df.withColumn("items",explode_outer("items"))

df = df.select("items.item_id","items.price","items.product_name","items.quantity","order_id","timestamp","customer_id","name","email","city","country","postal_code","payment.method","payment.transaction_id","metadata")

df = df.withColumn("metadata",explode_outer("metadata"))
df = df.select("*","metadata.key","metadata.value").drop("metadata")


In [0]:
df.writeStream.format("delta")\
        .outputMode("append")\
        .trigger(once=True)\
        .option("path","/Volumes/workspace/stream/streaming/jsonsink/Data")\
        .option("checkpointLocation","/Volumes/workspace/stream/streaming/jsonsink/checkpoint")\
        .start()


In [0]:
%sql
SELECT * FROM delta.`/Volumes/workspace/stream/streaming/jsonsink/Data`

### **Archiving**

In [0]:
dbutils.fs.mkdirs("/Volumes/workspace/stream/streaming/jsonsinknew")


In [0]:
df = spark.readStream.format("json")\
        .option("multiLine",True)\
        .schema(my_schema)\
        .option("cleanSource","archive")\
        .option("sourceArchiveDir","/Volumes/workspace/stream/streaming/jsonsourcearchive")\
        .load("/Volumes/workspace/stream/streaming/jsonsourcenew")


# Transformations
df = df.select("items","order_id","timestamp","customer.customer_id","customer.name","customer.email","customer.address.city","customer.address.country","customer.address.postal_code","payment","metadata")

df = df.withColumn("items",explode_outer("items"))

df = df.select("items.item_id","items.price","items.product_name","items.quantity","order_id","timestamp","customer_id","name","email","city","country","postal_code","payment.method","payment.transaction_id","metadata")

df = df.withColumn("metadata",explode_outer("metadata"))
df = df.select("*","metadata.key","metadata.value").drop("metadata")


In [0]:
df.writeStream.format("delta")\
        .outputMode("append")\
        .trigger(once=True)\
        .option("path","/Volumes/workspace/stream/streaming/jsonsinknew/Data")\
        .option("checkpointLocation","/Volumes/workspace/stream/streaming/jsonsinknew/checkpoint")\
        .start()



### READ JSON DATA (BATCH)

In [0]:
# df = spark.read.format("json")\
#           .option("inferShcema",True)\
#           .option("multiLine",True)\
#           .load("/Volumes/workspace/stream/streaming/jsonsource")

# display(df)

In [0]:
# df = df.select("items","order_id","timestamp","customer.customer_id","customer.name","customer.email","customer.address.city","customer.address.country","customer.address.postal_code","payment","metadata")

# df = df.withColumn("items",explode_outer("items"))

# display(df)

In [0]:
# df = df.select("items.item_id","items.price","items.product_name","items.quantity","order_id","timestamp","customer_id","name","email","city","country","postal_code","payment.method","payment.transaction_id","metadata")
# display(df)

In [0]:
# df = df.withColumn("metadata",explode_outer("metadata"))
# df = df.select("*","metadata.key","metadata.value").drop("metadata")
# display(df)