# Customers table

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Read streaming JSON data from the raw users directory using Auto Loader
df_users = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/Volumes/stream1/stream1/poc_streaming/checkpoints/customers_schema")  # Path to store schema
        .option("mergeSchema", "true")  # Enable schema evolution
        .load("/Volumes/stream1/stream1/poc_streaming/customers/")
        .withColumn("ingestion_time", current_timestamp())
        .withColumn("source_file", col("_metadata.file_path"))  # Source directory
)

# Drop the _rescued_data column if present
df_users = df_users.drop("_rescued_data")

# Write the streaming DataFrame to a Delta tabwwwwwle in append mode.
# ERROR: partitionBy is not supported with toTable() in streaming write
df_users.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/Volumes/stream1/stream1/poc_streaming/checkpoints/customers_stream") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .toTable("stream1.stream1.customers_bronze")  # Target Delta table

# Products table

In [0]:
# Read streaming JSON data from the raw products directory using Auto Loader
df_products = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation",  "/Volumes/stream1/stream1/poc_streaming/checkpoints/orders_schema")  # Path to store schema
        .option("mergeSchema", "true")  # Enable schema evolution
        .load("/Volumes/stream1/stream1/poc_streaming/orders/")  # Source directory
        .withColumn("ingestion_time", current_timestamp())
        .withColumn("source_file", col("_metadata.file_path"))
)

# Drop the _rescued_data column if present
df_products = df_products.drop("_rescued_data")

# Write the streaming DataFrame to a Delta table in append mode
df_products.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",  "/Volumes/stream1/stream1/poc_streaming/checkpoints/orders_stream") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .toTable("stream1.stream1.orders_bronze")  # Target Delta table


# Product table

In [0]:
# Read streaming JSON data from the raw payments directory using Auto Loader
df_price = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/Volumes/stream1/stream1/poc_streaming/checkpoints/prices_schema")  # Path to store schema
        .option("mergeSchema", "true")  # Enable schema evolution
        .load("/Volumes/stream1/stream1/poc_streaming/prices/")  # Source directory
        .withColumn("ingestion_time", current_timestamp())
        .withColumn("source_file", col("_metadata.file_path"))
)

# Drop the _rescued_data column if present
df_price = df_price.drop("_rescued_data")

# Write the streaming DataFrame to a Delta table in append mode
df_price.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation","/Volumes/stream1/stream1/poc_streaming/checkpoints/prices_stream") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .toTable("stream1.stream1.prices_bronze")  # Target Delta table


# Trades table

In [0]:
# Read streaming JSON data from the raw trades directory using Auto Loader
df_trades = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/Volumes/stream1/stream1/poc_streaming/checkpoints/trades_schema")  # Path to store schema
        .option("mergeSchema", "true")  # Enable schema evolution
        .option("cloudFiles.inferColumnTypes", "true")  # Infer new fields automatically
        .load("/Volumes/stream1/stream1/poc_streaming/trades/")  # Source directory
        .withColumn("ingestion_time", current_timestamp())
        .withColumn("source_file", col("_metadata.file_path"))
)

# Drop the _rescued_data column if present
df_trades = df_trades.drop("_rescued_data")

# Write the streaming DataFrame to a Delta table in append mode
df_trades.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/Volumes/stream1/stream1/poc_streaming/checkpoints/trades_stream") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .toTable("stream1.stream1.trades_bronze")  # Target Delta table


# Wallets table

In [0]:
# Read streaming JSON data from the raw wallets directory using Auto Loader
df_wallets = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/Volumes/stream1/stream1/poc_streaming/checkpoints/wallets_schema")  # Path to store schema
        .option("mergeSchema", "true")  # Enable schema evolution
        .load("/Volumes/stream1/stream1/poc_streaming/wallets/")  # Source directory
        .withColumn("ingestion_time", current_timestamp())
        .withColumn("source_file", col("_metadata.file_path"))
)

# Drop the _rescued_data column if present
df_wallets = df_wallets.drop("_rescued_data")

# Write the streaming DataFrame to a Delta table in append mode
df_wallets.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/Volumes/stream1/stream1/poc_streaming/checkpoints/wallets_stream") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .toTable("stream1.stream1.wallets_bronze")  # Target Delta table


In [0]:
tables = [
    "stream1.stream1.customers_bronze",
    "stream1.stream1.orders_bronze",
    "stream1.stream1.prices_bronze",
    "stream1.stream1.trades_bronze",
    "stream1.stream1.wallets_bronze"
]

counts = []
for table in tables:
    count = spark.table(table).count()
    counts.append((table, count))

df_counts = spark.createDataFrame(counts, ["table_name", "record_count"])
display(df_counts)