In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType
topic = "market_data"

# Kafka configuration
kafka_bootstrap_servers_tls = "pkc-ldvj1.ap-southeast-2.aws.confluent.cloud:9092"
kafka_security_protocol = "SASL_SSL"
kafka_sasl_mechanism = "PLAIN"
kafka_username = dbutils.secrets.get(scope="kafka-secrets", key="kafka-username")
kafka_password = dbutils.secrets.get(scope="kafka-secrets", key="kafka-password")
kafka_sasl_jaas_config=f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_username}" password="{kafka_password}";'

input_schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("volume", IntegerType(), True),
    StructField("timestamp", LongType(), True)
])

@dlt.table
def market_data_l():
    return (
    (spark.readStream
    .format("kafka")
    .option("subscribe", topic)
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls)
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_username}" password="{kafka_password}";')
    .option("startingOffsets", "earliest")
    .load()).select(col("key").cast("string").alias("id"), from_json(col("value").cast("string"), input_schema).alias("json"))
  )

In [0]:
@dlt.table
def market_data_silver():
    df = dlt.read("market_data_l")
    return df.select(
        col("id"),
        col("json.symbol").alias("symbol"),
        col("json.price").alias("price"),
        col("json.volume").alias("volume"),
        from_unixtime(col("json.timestamp")).alias("timestamp")
    )

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

@dlt.table
def market_data_gold():
    df = dlt.read("market_data_silver")
    window_spec = Window.partitionBy("symbol").orderBy(col("timestamp").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    return df.withColumn("row_number", row_number().over(window_spec)) \
             .filter(col("row_number") <= 10) \
             .groupBy("symbol") \
             .agg(
                 max("price").alias("highest_price"),
                 min("price").alias("lowest_price")
             )