In [0]:
from pyspark.sql.functions import col, from_json, to_timestamp, avg, count, window, date_format
from pyspark.sql.types import StructType, StringType, DoubleType

import dlt

# Define the schema for the Kafka data
schema = StructType() \
    .add("DeviceID", StringType()) \
    .add("CustomerID", StringType()) \
    .add("Timestamp", StringType()) \
    .add("Region", StringType()) \
    .add("SignalStrength", DoubleType()) \
    .add("CallDropRate", DoubleType()) \
    .add("DataTransferSpeed", DoubleType()) \
    .add("ComplaintType", StringType()) \
    .add("CustomerSatisfaction", DoubleType())

# BRONZE LAYER
@dlt.table(
  comment="Raw data ingested from Kafka topic: 'projecttest'.",
  table_properties={
    "quality": "bronze",
    "pipelines.autoOptimize.managed": "true"
  }
)
def bronze_kafka_data():
  kafka_df = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "65.0.75.84:9092")
    .option("subscribe", "projecttest")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")  # Ignore missing data errors
    .load())

  
  # Cast value to string and parse the JSON structure
  return kafka_df.selectExpr("CAST(value AS STRING)") \
      .select(from_json(col("value"), schema).alias("data")).select("data.*")

# SILVER LAYER
@dlt.table(
  comment="Cleaned and filtered data from Bronze, partitioned by 'Region'.",
  table_properties={
    "quality": "silver",
    "pipelines.autoOptimize.managed": "true"
  }
)
@dlt.expect_or_drop("valid_signal_strength", "SignalStrength BETWEEN 0 AND 100")
@dlt.expect_or_drop("valid_call_drop_rate", "CallDropRate BETWEEN 0 AND 10")
@dlt.expect_or_drop("valid_data_transfer_speed", "DataTransferSpeed BETWEEN 5 AND 100")
def silver_cleaned_data():
  bronze_data = dlt.read_stream("bronze_kafka_data")
  
  # Convert the timestamp field and clean the data
  cleaned_data = bronze_data.withColumn(
      "Timestamp", to_timestamp("Timestamp", "yyyy-MM-dd HH:mm:ss"))
  
  return cleaned_data

# GOLD LAYER: Signal Strength Trend
@dlt.table(
  comment="Signal Strength trend over time for each customer.",
  table_properties={
    "quality": "gold",
    "pipelines.autoOptimize.managed": "true"
  }
)
def signal_strength_trend():
  silver_data = dlt.read_stream("silver_cleaned_data")
  
  return silver_data.groupBy(window("Timestamp", "1 day"), "CustomerID")\
      .agg(avg("SignalStrength").alias("AvgSignalStrength"))\
      .orderBy("window.start")

# GOLD LAYER: Call Drop Rate Analysis
@dlt.table(
  comment="Average Call Drop Rate for each customer.",
  table_properties={
    "quality": "gold",
    "pipelines.autoOptimize.managed": "true"
  }
)
def call_drop_rate_analysis():
  silver_data = dlt.read_stream("silver_cleaned_data")
  
  return silver_data.groupBy("CustomerID")\
      .agg(avg("CallDropRate").alias("AvgCallDropRate"))

# GOLD LAYER: Data Transfer Speed Comparison
@dlt.table(
  comment="Average Data Transfer Speed per month for each customer.",
  table_properties={
    "quality": "gold",
    "pipelines.autoOptimize.managed": "true"
  }
)
def data_transfer_speed_comparison():
  silver_data = dlt.read_stream("silver_cleaned_data")
  
  return silver_data.withColumn("Month", date_format("Timestamp", "yyyy-MM"))\
      .groupBy("Month", "CustomerID")\
      .agg(avg("DataTransferSpeed").alias("AvgDataTransferSpeed"))

# GOLD LAYER: Geographical Heatmap
@dlt.table(
  comment="Average Signal Strength by region.",
  table_properties={
    "quality": "gold",
    "pipelines.autoOptimize.managed": "true"
  }
)
def geographical_heatmap():
  silver_data = dlt.read_stream("silver_cleaned_data")
  
  return silver_data.groupBy("Region")\
      .agg(avg("SignalStrength").alias("AvgSignalStrength"))

# GOLD LAYER: Customer Complaints Analysis
@dlt.table(
  comment="Complaint type distribution.",
  table_properties={
    "quality": "gold",
    "pipelines.autoOptimize.managed": "true"
  }
)
def complaints_analysis():
  silver_data = dlt.read_stream("silver_cleaned_data")
  
  return silver_data.groupBy("ComplaintType")\
      .agg(count("*").alias("ComplaintCount"))

# Uncomment to implement Customer Satisfaction Analysis
# GOLD LAYER: Satisfaction vs Call Drop Rate
# @dlt.table(
#   comment="Correlation between Customer Satisfaction and Call Drop Rates.",
#   table_properties={
#     "quality": "gold",
#     "pipelines.autoOptimize.managed": "true"
#   }
# )
# def satisfaction_vs_call_drop():
#   silver_data = dlt.read_stream("silver_cleaned_data")
#   return silver_data.groupBy("CustomerID")\
#       .agg(avg("CallDropRate").alias("AvgCallDropRate"), avg("CustomerSatisfaction").alias("AvgCustomerSatisfaction"))
