Create Staging Delta Table

In [None]:
%sql

CREATE TABLE kafka_data_stg (
    STOCK_NAME STRING,
    CURR_VALUE FLOAT,
    CLOSE_VALUE FLOAT,
    CURRENT_TIME STRING
) USING DELTA

Create PySpark Streaming Job to load data into Delta table

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, from_unixtime

# Create a Spark session
spark = SparkSession.builder.appName("Kafka").getOrCreate()

# Read data from Kafka
df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "local_host:9092")
  .option("subscribe", "kafka-topic")
  .option("startingOffsets", "latest")
  .load())

# Split the value column by comma and select the resulting columns
split_df = df.select(split(col("value"), ",").alias("split_values"))

# Extract individual columns from the split array
parsed_df = split_df.select(
    col("split_values")[0].alias("STOCK_NAME"),
    col("split_values")[1].cast("FLOAT").alias("CURR_VALUE"),
    col("split_values")[2].cast("FLOAT").alias("CLOSE_VALUE"),
    col("split_values")[3].alias("CURRENT_TIME")
)

# Write the data to the Databricks table
query = parsed_df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/tmp") \
    .table("kafka_data_stg")

query.awaitTermination()

Create Warehouse Table for Complex Trasformation and Analytics

In [None]:
%sql

CREATE TABLE kafka_data (
    STOCK_NAME STRING,
    CURR_VALUE FLOAT,
    CLOSE_VALUE FLOAT,
    DIFF_VALUE FLOAT AS ROUND(CAST(CURR_VALUE AS FLOAT)-CAST(CLOSE_VALUE AS FLOAT), 2),
    PER_VALUE FLOAT AS ((ROUND(CAST(CURR_VALUE AS FLOAT)-CAST(CLOSE_VALUE AS FLOAT), 2))/CAST(CLOSE_VALUE AS FLOAT))*100,
    CURRENT_TIME TIMESTAMP
)
COPY INTO kafka_data_stg
FROM delta_table
DELIMITER ','