In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType, DateType
import pyspark.sql.functions as F
import json
from pyspark.sql.window import Window

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "auto")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.delta.optimizeWrite.enabled", "true")

##1. Read files using DataStreamReader API

In [0]:
customers_schema = StructType([
  StructField("customer_id", IntegerType(), True),
  StructField("Customer_name", StringType(), True),
  StructField("date_of_birth", DateType(), True),
  StructField("telephone", StringType(), True),
  StructField("email", StringType(), True),
  StructField("member_since", StringType(), True),
  StructField("created_timestamp", TimestampType(), True)
])

In [0]:
customers_df = (
            spark.readStream
                .format("json")
                .schema(customers_schema)
                .load("/Volumes/gizmobox/landing/operational_data/customers_stream/")
)

##2. Transform the dataframe to add the following columns

In [0]:
customers_transformed_df = (
          customers_df.withColumn("file_path", F.col("_metadata.file_path"))
                      .withColumn("ingestion_date", F.current_timestamp())
)

##3. Write the transformed data stream to Delta Table

In [0]:
streaming_query = (
        customers_transformed_df.writeStream
            .format("delta")
            .option("checkpointLocation", "/Volumes/gizmobox/landing/operational_data/customers_stream/_checkpoints")
            .toTable("gizmobox.bronze.customers_stream")
)

In [0]:
streaming_query.stop()

In [0]:
%sql
SELECT * FROM gizmobox.bronze.customers_stream