# Read data from Apache Kafka

### Import dependencies

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, from_json, col, window, sum as _sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType, LongType, BinaryType, BooleanType, TimestampNTZType
import binascii
import base64

### Variables

In [None]:
storage_path = "s3://yout_bucket/"
# Configuración de zona horaria
spark.conf.set("spark.sql.session.timeZone", "UTC")

### Functions

In [None]:
# decode base64 price field
def decode_base64_decimal(base64_bytes):
    try:
        price_decimal = int.from_bytes(base64_bytes, byteorder='big', signed=True) / 100.0
        return price_decimal
    except Exception:
        return None
      

# Registre the function
decode_base64_decimal_udf = udf(decode_base64_decimal, DoubleType())      

### Read kafka

In [None]:


spark = SparkSession.builder \
    .appName("Kafka-Spark-Databricks-Demo-sales") \
    .getOrCreate()

kafka_brokers = "<public-ip>:9092"
sales_topic = "dbserver1.public.sales"
products_topic = "dbserver1.public.products"
customers_topic = "dbserver1.public.customers"

# Debezium kafka sales structure 
sales_schema = StructType([
    StructField("before", StructType([
        StructField("sale_id", IntegerType()),
        StructField("customer_id", IntegerType()),
        StructField("product_id", IntegerType()),
        StructField("quantity", IntegerType()),
        StructField("sale_date", LongType()),
        StructField("created_at", LongType()),
        StructField("modified_at", LongType())
    ]), nullable=True),
    StructField("after", StructType([
        StructField("sale_id", IntegerType()),
        StructField("customer_id", IntegerType()),
        StructField("product_id", IntegerType()),
        StructField("quantity", IntegerType()),
        StructField("sale_date", LongType()),
        StructField("created_at", LongType()),
        StructField("modified_at", LongType())
    ])),
    StructField("source", StructType([
        StructField("version", StringType()),
        StructField("connector", StringType()),
        StructField("name", StringType()),
        StructField("ts_ms", LongType()),
        StructField("snapshot", StringType()),
        StructField("db", StringType()),
        StructField("sequence", StringType()),
        StructField("schema", StringType()),
        StructField("table", StringType()),
        StructField("txId", LongType()),
        StructField("lsn", LongType()),
        StructField("xmin", LongType(), nullable=True)
    ])),
    StructField("op", StringType()),
    StructField("ts_ms", LongType()),
    StructField("transaction", StructType([
        StructField("id", StringType(), nullable=True),
        StructField("total_order", LongType(), nullable=True),
        StructField("data_collection_order", LongType(), nullable=True)
    ]), nullable=True)
])

# Debezium kafka products structure
products_schema = StructType([
    StructField("before", StructType([
        StructField("product_id", IntegerType()),
        StructField("product_name", StringType()),
        StructField("description", StringType()),
        StructField("price", BinaryType()),
        StructField("created_at", LongType()),
        StructField("modified_at", LongType())
    ]), nullable=True),
    StructField("after", StructType([
        StructField("product_id", IntegerType()),
        StructField("product_name", StringType()),
        StructField("description", StringType()),
        StructField("price", BinaryType()),
        StructField("created_at", LongType()),
        StructField("modified_at", LongType())
    ])),
    StructField("source", StructType([
        StructField("version", StringType()),
        StructField("connector", StringType()),
        StructField("name", StringType()),
        StructField("ts_ms", LongType()),
        StructField("snapshot", StringType()),
        StructField("db", StringType()),
        StructField("sequence", StringType()),
        StructField("schema", StringType()),
        StructField("table", StringType()),
        StructField("txId", LongType()),
        StructField("lsn", LongType()),
        StructField("xmin", LongType(), nullable=True)
    ])),
    StructField("op", StringType()),
    StructField("ts_ms", LongType()),
    StructField("transaction", StructType([
        StructField("id", StringType(), nullable=True),
        StructField("total_order", LongType(), nullable=True),
        StructField("data_collection_order", LongType(), nullable=True)
    ]), nullable=True)
])

# Debezium kafka customers structure 
customers_schema = StructType([
    StructField("before", StructType([
        StructField("customer_id", IntegerType()),
        StructField("first_name", StringType()),
        StructField("last_name", StringType()),
        StructField("email", StringType()),
        StructField("phone", StringType()),
        StructField("created_at", LongType()),
        StructField("modified_at", LongType())
    ]), nullable=True),
    StructField("after", StructType([
        StructField("customer_id", IntegerType()),
        StructField("first_name", StringType()),
        StructField("last_name", StringType()),
        StructField("email", StringType()),
        StructField("phone", StringType()),
        StructField("created_at", LongType()),
        StructField("modified_at", LongType(), nullable=True)
    ])),
    StructField("source", StructType([
        StructField("version", StringType()),
        StructField("connector", StringType()),
        StructField("name", StringType()),
        StructField("ts_ms", LongType()),
        StructField("snapshot", StringType()),
        StructField("db", StringType()),
        StructField("sequence", StringType()),
        StructField("schema", StringType()),
        StructField("table", StringType()),
        StructField("txId", LongType()),
        StructField("lsn", LongType()),
        StructField("xmin", LongType(), nullable=True)
    ])),
    StructField("op", StringType()),
    StructField("ts_ms", LongType()),
    StructField("transaction", StructType([
        StructField("id", StringType(), nullable=True),
        StructField("total_order", LongType(), nullable=True),
        StructField("data_collection_order", LongType(), nullable=True)
    ]), nullable=True)
])

# Read sales topic from kafka 
sales_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", sales_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Read products topic from kafka
products_df = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", products_topic) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

# Read customers topic from kafka 
customers_df = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", customers_topic) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

# Crear una vista temporal para productos
products_df.createOrReplaceTempView("products")

### Parse dataframe

In [None]:
# Parse dataframe
sales_parsed_df = sales_df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json("json_str", sales_schema).alias("data")) \
    .select("data.*")

products_parsed_df = products_df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json("json_str", products_schema).alias("data")) \
    .select("data.*")

customers_parsed_df = customers_df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json("json_str", customers_schema).alias("data")) \
    .select("data.*")

sales_after_df = sales_parsed_df.select("after.*")
products_after_df = products_parsed_df.select("after.*")
customers_after_df = customers_parsed_df.select("after.*")

### Apply columns transformation

In [None]:
sales_final_df = sales_after_df.withColumn(
    "sale_date", (col("sale_date") / 1000000).cast(TimestampType())
).withColumn(
    "created_at", (col("created_at") / 1000000).cast(TimestampType())
).withColumn(
    "modified_at", (col("modified_at") / 1000000).cast(TimestampType())
)

products_final_df = products_after_df.withColumn(
    "price", decode_base64_decimal_udf(col("price"))
).withColumn(
    "created_at", (col("created_at") / 1000000).cast(TimestampType())
).withColumn(
    "modified_at", (col("modified_at") / 1000000).cast(TimestampType())
)

customers_final_df = customers_after_df.withColumn(
    "created_at", (col("created_at") / 1000000).cast(TimestampType())
).withColumn(
    "modified_at", (col("modified_at") / 1000000).cast(TimestampType())
)

### Write batch delta table

In [None]:
products_delta_path = storage_path + "demo_sales/products"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS demo_sales.products
USING delta
LOCATION '{products_delta_path}'
""")

products_final_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(products_delta_path)

customers_delta_path = storage_path + "demo_sales/customers"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS demo_sales.customers
USING delta
LOCATION '{customers_delta_path}'
""")

customers_final_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(customers_delta_path)

# Stream Processing data

In [None]:

# Create a temp view from products
products_final_df.createOrReplaceTempView("products")

# Join dataframe
joined_df = sales_final_df.join(spark.table("products"), sales_final_df.product_id == col("products.product_id")) \
    .select(
        sales_final_df.sale_date,
        sales_final_df.quantity,
        col("products.price")
    )

# Create a window processing
windowed_df = joined_df \
    .withWatermark("sale_date", "1 hour") \
    .groupBy(
        window(col("sale_date"), "1 hour"),
        expr("EXTRACT(YEAR FROM sale_date) AS anio"),
        expr("EXTRACT(MONTH FROM sale_date) AS mes"),
        expr("EXTRACT(WEEK FROM sale_date) AS semana"),
        expr("EXTRACT(DAY FROM sale_date) AS dia"),
        expr("EXTRACT(HOUR FROM sale_date) AS hora")
    ) \
    .agg(_sum(col("quantity") * col("price")).alias("venta_neta"))



# Write Streaming Delta table

In [None]:
sales_delta_path = storage_path + "demo_sales/sales"
checkpoint_sales_delta_path = storage_path + "demo_sales/checkpoint/sales"

spark.sql(f"""
CREATE TABLE IF NOT EXISTS demo_sales.sales
USING delta
LOCATION '{sales_delta_path}'
""")

# Write data in append mode
sales_query = sales_final_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_sales_delta_path) \
    .option("mergeSchema", "true") \
    .start(sales_delta_path)

dwt_sales_delta_path = storage_path + "demo_sales/dwt_sales_1"
checkpoint_dwt_sales_delta_path = storage_path + "demo_sales/checkpoint/dwt_sales_1"

spark.sql(f"""
CREATE TABLE IF NOT EXISTS demo_sales.dwt_sales_1
USING delta
LOCATION '{dwt_sales_delta_path}'
""")

# Write data in complete mode
dwt_sales_query = windowed_df.writeStream \
    .format("delta") \
    .outputMode("complete") \
    .option("checkpointLocation", checkpoint_dwt_sales_delta_path) \
    .option("mergeSchema", "true") \
    .start(dwt_sales_delta_path)

dwt_sales_query.awaitTermination()
sales_query.awaitTermination()