#### READ MICROBATCH `CUSTOMERS.JSON` FILE FROM VOLUME

In [0]:
cust_streaming_df = (spark.readStream
                      .format("cloudFiles")
                      .option("cloudFiles.format", "json")
                      .option("cloudFiles.schemaLocation", '/Volumes/gizmobox_dev/landing/operational_data/customers/*.json/_schema')
                      .option("cloudFiles.maxFilesPerTrigger", 1)
                      .option("cloudFiles.inferColumnTypes", "true")
                     .option("cloudFiles.schemaHints", "customer_id int, customer_name string, date_of_birth date, telephone string, email string, member_since date, created_timestamp timestamp")
                      .load('/Volumes/gizmobox_dev/landing/operational_data/customers_streaming/*.json')
)

#### ADD AUDIT COLUMNS TO RECORD `FILE_NAME` & `LOAD_TIMESTAMP`

In [0]:
from pyspark.sql.functions import col, current_timestamp

cust_audit_df =  (cust_streaming_df.withColumn('file_path', col('_metadata.file_path'))
                                  .withColumn('load_timestamp', current_timestamp())
)
# display(cust_audit_df)

#### WRITE TRANSFORMED DATA INTO DELTA TABLE

In [0]:
checkpoint_path = "/Volumes/gizmobox_dev/landing/operational_data/customers_streaming/_checkpoints"
dbutils.fs.rm(checkpoint_path, recurse=True)
print(f"Checkpoint directory {checkpoint_path} deleted.")

In [0]:
cust_audit_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/Volumes/gizmobox_dev/landing/operational_data/customers_streaming/_checkpoints") \
    .toTable("gizmobox_dev.bronze.customers_streaming")

In [0]:
dbutils.notebook.exit('CUSTOMERS STREAMING HAS BEEN LOADED INTO BRONZE SCHEMA....')