In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### READ JSON DATA (BATCH)

In [0]:
# df = spark.read.format('json')\
#     .option('inferSchema', True)\
#     .option('multiLine', True)\
#     .load('/Volumes/catalog_deltalake/stream/streaming/jsonsource/')
# # df.display()

# df = df.select(
#     'order_id',
#     'timestamp',
#     col('customer.customer_id').alias('customer_id'),
#     col('customer.name').alias('name'),
#     col('customer.email').alias('email'),
#     col('customer.address.city').alias('city'),
#     col('customer.address.country').alias('country'),
#     col('customer.address.postal_code').alias('postal_code'),
#     col("payment.method").alias('method'),
#     col("payment.transaction_id").alias("payment_transaction_id"),
#     explode_outer("items").alias('item'),
#     explode_outer('metadata').alias('metadata')
# )
# df = df.select(
#     'order_id',
#     'timestamp',
#     'customer_id',
#     'name',
#     'email',
#     'city',
#     'country',
#     'postal_code',
#     'method',
#     'payment_transaction_id',
#     col('item.item_id').alias('item_id'),
#     col('item.price').alias('price'),
#     col('item.quantity').alias('quantity'),
#     'metadata.key',    'metadata.value' 
# )
# display(df)

In [0]:
# df = spark.read.format('json')\
#     .option('inferSchema', True)\
#     .option('multiLine', True)\
#     .load('/Volumes/catalog_deltalake/stream/streaming/jsonsource/')
# df.schema

### READ JSON DATA (STREAMING)

In [0]:
my_json_schema = StructType([StructField('customer', StructType([StructField('address', StructType([StructField('city', StringType(), True), StructField('country', StringType(), True), StructField('postal_code', StringType(), True)]), True), StructField('customer_id', LongType(), True), StructField('email', StringType(), True), StructField('name', StringType(), True)]), True), StructField('items', ArrayType(StructType([StructField('item_id', StringType(), True), StructField('price', DoubleType(), True), StructField('product_name', StringType(), True), StructField('quantity', LongType(), True)]), True), True), StructField('metadata', ArrayType(StructType([StructField('key', StringType(), True), StructField('value', StringType(), True)]), True), True), StructField('order_id', StringType(), True), StructField('payment', StructType([StructField('method', StringType(), True), StructField('transaction_id', StringType(), True)]), True), StructField('timestamp', StringType(), True)])

In [0]:
streaming_df = spark.readStream.format('json')\
    .schema(my_json_schema)\
    .option('multiLine', True)\
    .load('/Volumes/catalog_deltalake/stream/streaming/jsonsource/')

streaming_df = streaming_df.select(
    'order_id',
    'timestamp',
    col('customer.customer_id').alias('customer_id'),
    col('customer.name').alias('name'),
    col('customer.email').alias('email'),
    col('customer.address.city').alias('city'),
    col('customer.address.country').alias('country'),
    col('customer.address.postal_code').alias('postal_code'),
    col("payment.method").alias('method'),
    col("payment.transaction_id").alias("payment_transaction_id"),
    explode_outer("items").alias('item'),
    explode_outer('metadata').alias('metadata')
)
streaming_df = streaming_df.select(
    'order_id',
    'timestamp',
    'customer_id',
    'name',
    'email',
    'city',
    'country',
    'postal_code',
    'method',
    'payment_transaction_id',
    col('item.item_id').alias('item_id'),
    col('item.price').alias('price'),
    col('item.quantity').alias('quantity'),
    'metadata.key',    'metadata.value' 
)


In [0]:
streaming_df.writeStream.format('delta')\
    .outputMode('append')\
    .option('checkpointLocation', '/Volumes/catalog_deltalake/stream/streaming/jsonsink/checkpoint')\
    .option('path', '/Volumes/catalog_deltalake/stream/streaming/jsonsink/delta')\
    .trigger(once=True )\
    .start()

In [0]:
%sql
select * from delta.`/Volumes/catalog_deltalake/stream/streaming/jsonsink/delta`

### ARCHIVING
It process the data only when process is there then move the processed file to archive location

In [0]:
# dbutils.fs.mkdirs('/Volumes/catalog_deltalake/stream/streaming/json_source_new')
# dbutils.fs.mkdirs('/Volumes/catalog_deltalake/stream/streaming/json_sorce_file_archives')
# dbutils.fs.mkdirs('/Volumes/catalog_deltalake/stream/streaming/json_sink_archive')

In [0]:
streaming_df = spark.readStream.format('json')\
    .schema(my_json_schema)\
    .option('multiLine', True)\
    .option('cleanSource', 'archive') \
    .option('sourceArchiveDir', '/Volumes/catalog_deltalake/stream/streaming/json_sorce_file_archives')\
    .load('/Volumes/catalog_deltalake/stream/streaming/json_source_new/')

streaming_df = streaming_df.select(
    'order_id',
    'timestamp',
    col('customer.customer_id').alias('customer_id'),
    col('customer.name').alias('name'),
    col('customer.email').alias('email'),
    col('customer.address.city').alias('city'),
    col('customer.address.country').alias('country'),
    col('customer.address.postal_code').alias('postal_code'),
    col("payment.method").alias('method'),
    col("payment.transaction_id").alias("payment_transaction_id"),
    explode_outer("items").alias('item'),
    explode_outer('metadata').alias('metadata')
)
streaming_df = streaming_df.select(
    'order_id',
    'timestamp',
    'customer_id',
    'name',
    'email',
    'city',
    'country',
    'postal_code',
    'method',
    'payment_transaction_id',
    col('item.item_id').alias('item_id'),
    col('item.price').alias('price'),
    col('item.quantity').alias('quantity'),
    'metadata.key',    'metadata.value' 
)


In [0]:
streaming_df.writeStream.format('delta')\
    .outputMode('append')\
    .option('checkpointLocation', '/Volumes/catalog_deltalake/stream/streaming/json_sink_archive/checkpoint')\
    .option('path', '/Volumes/catalog_deltalake/stream/streaming/json_sink_archive/delta')\
    .trigger(once=True )\
    .start()

In [0]:
%sql
select * from delta.`/Volumes/catalog_deltalake/stream/streaming/json_sink_archive/delta`