In [0]:
%python
df = spark.read.format("delta").load("abfss://retailstream360@storageaccountsudip1.dfs.core.windows.net/bronze/customers/")
df.display()

In [0]:
%python
df.printSchema()

In [0]:
%python
from pyspark.sql import SparkSession
from utils.logger import get_logger
from utils.audit_logger import log_audit
from utils.schema_validator import validate_schema
from utils.scd1 import apply_scd1
from utils.data_quality import run_data_quality_checks
from utils.schema_metadata import silver_table_metadata


logger = get_logger("customers_silver")
table_conf = silver_table_metadata["customers"]
source_path = table_conf["source_path"]
target_path = table_conf["target_path"]
schema = table_conf["schema"]
primary_keys = table_conf["primary_keys"]

In [0]:
%python
df = spark.read.format("delta").load(source_path)
logger.info("Loaded customers data")

In [0]:
%python 
df.display()

In [0]:
%python
if not validate_schema(df, schema):
    log_audit(spark, "customers", "failed", "Schema validation failed")
    print("Schema validation failed") 

In [0]:
%python
logger.info("Running data quality checks")
dq_metrics = run_data_quality_checks(df, primary_keys)
logger.info("Data Quality checks done")

In [0]:
%python
logger.info("Applying SCD1 Logic")
apply_scd1(spark, df, target_path, primary_keys)
log_audit(spark, "customers", "success", "Loaded with SCD1", dq_metrics.get("row_count", 0))
logger.info("SCD1 checked")

In [0]:
=== Confluent Cloud API key ===

API key:
YN4I3CHDNNVPHGIY

API secret:
V2B2pBI8IzbmX6S2YQZiKBaeaXFqSXtC484isML2vMgia264QD7nmhD2vEDCzVKH

Resource:
lkc-x8d1pk

Bootstrap server:
pkc-56d1g.eastus.azure.confluent.cloud:9092


In [0]:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType, StructType

# Create Spark session with Delta and Kafka support
spark = SparkSession.builder \
    .appName("Kafka to Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Kafka topic and schema setup
kafka_bootstrap_servers = "<your-confluent-bootstrap-server>:9092"
kafka_topic = "<your-topic>"
kafka_api_key = "<your-api-key>"
kafka_api_secret = "<your-api-secret>"

# Build SASL_SSL options for Confluent Cloud
kafka_options = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "subscribe": kafka_topic,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="apikey" password="{kafka_api_secret}";',
    "startingOffsets": "latest"
}

# Define your schema based on expected Kafka message format
message_schema = StructType().add("id", StringType()).add("value", StringType())

# Read data from Kafka
df = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Deserialize JSON Kafka value
parsed_df = df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), message_schema).alias("data")) \
    .select("data.*")

# Write to Delta format
query = parsed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/delta/checkpoints/kafka_stream") \
    .start("/delta/output/kafka_data")

query.awaitTermination()


In [0]:
%python

from pyspark.sql.functions import col, from_json

KAFKA_BROKER = "pkc-56d1g.eastus.azure.confluent.cloud:9092"
KAFKA_TOPIC = "orders"
checkpoint_path = "abfss://retailstream360@storageaccountsudip1.dfs.core.windows.net/silver/checkpoints"

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("product_id", StringType()), 
    StructField("order_date", DateType()),
    StructField("status", StringType())])


# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers",KAFKA_BROKER) \
      .option("subscribe", KAFKA_TOPIC) \
      .option("startingOffsets", "earliest") \
      .option("kafka.security.protocol","SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="YN4I3CHDNNVPHGIY" password="V2B2pBI8IzbmX6S2YQZiKBaeaXFqSXtC484isML2vMgia264QD7nmhD2vEDCzVKH";""") \
    .load()

# Deserialize and process (example for JSON data)
processed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# processed_df.writeStream.format("console").option("truncate",True).start()
# #query.awaitTermination()

# Write to Delta Lake
# query = processed_df.writeStream \
#     .format("delta") \
#     .option("kafka.bootstrap.servers", "pkc-56d1g.eastus.azure.confluent.cloud:9092")\
#     .option("topic", "orders")\
#     .option("kafka.security.protocol", "SASL_SSL")\
#     .option("kafka.sasl.mechanism", "PLAIN")\
#     .option("checkpointLocation", "abfss://retailstream360@storageaccountsudip1.dfs.core.windows.net/silver/checkpoints") \
#     .outputMode("append") \
#     .option("kafka.sasl.jaas.config",'kafkashadedorg.apache.kafka.common.security.plain.PlainLoginModule required username="YN4I3CHDNNVPHGIY" password="V2B2pBI8IzbmX6S2YQZiKBaeaXFqSXtC484isML2vMgia264QD7nmhD2vEDCzVKH";') \
#     .toTable("retail_catalog.silver.orders")

query = processed_df.writeStream \
  .format("delta")\
  .outputMode("append")\
  .option("kafka.bootstrap.servers", KAFKA_BROKER) \
  .option("topic", KAFKA_TOPIC) \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.mechanism", "PLAIN") \
  .option("checkpointLocation", checkpoint_path)\
  .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="EFDJSWUZVCTJ4TZI" password="pyyiWLLrgySmD+8CA+CKaT4TMIBNPzf+hj9N43mlNezeNYdMX/9qPdmn8FAqHzjb";""") \
  .option("mergeSchema",True)\
  .toTable("retail_catalog.silver.orders")


query.awaitTermination()


In [0]:
%python
from pyspark.sql.functions import col, from_json

KAFKA_BROKER = "pkc-56d1g.eastus.azure.confluent.cloud:9092"
KAFKA_TOPIC = "orders"
checkpoint_path = "abfss://retailstream360@storageaccountsudip1.dfs.core.windows.net/silver/checkpoints"

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("product_id", StringType()), 
    StructField("order_date", DateType()),
    StructField("status", StringType())])
kafka_options = {
    "kafka.bootstrap.servers" : KAFKA_BROKER, 
    "topic": KAFKA_TOPIC, 
    "subscribe": KAFKA_TOPIC,
    "kafka.security.protocol":"SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "checkpointLocation": checkpoint_path, 
    "kafka.sasl.jaas.config":"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="EFDJSWUZVCTJ4TZI" password="pyyiWLLrgySmD+8CA+CKaT4TMIBNPzf+hj9N43mlNezeNYdMX/9qPdmn8FAqHzjb";""",
    "mergeSchema": True,
    "startingOffsets" : "earliest"
}

In [0]:
%python

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Deserialize and process (example for JSON data)
processed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")



query = processed_df.writeStream \
  .format("delta")\
  .outputMode("append")\
  .options(**kafka_options)\
  .trigger(once=True)\
  .toTable("retail_catalog.silver.orders")


query.awaitTermination()


In [0]:
%python
df.display()

In [0]:
SELECT * from retail_catalog.silver.orders

In [0]:
CREATE SCHEMA IF NOT EXISTS retail_catalog.silver;

CREATE TABLE IF NOT EXISTS retail_catalog.silver.orders
USING DELTA
LOCATION 'abfss://retailstream360@storageaccountsudip1.dfs.core.windows.net/silver/orders';

In [0]:
CREATE TABLE IF NOT EXISTS retail_catalog.silver.transactions
USING DELTA
LOCATION 'abfss://retailstream360@storageaccountsudip1.dfs.core.windows.net/silver/transactions';

In [0]:
%python

from pyspark.sql.functions import col, from_json

KAFKA_BROKER = "pkc-56d1g.eastus.azure.confluent.cloud:9092"
KAFKA_TOPIC = "transactions"
checkpoint_path = "abfss://retailstream360@storageaccountsudip1.dfs.core.windows.net/silver/checkpoints_transactions"

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType




schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("order_id", StringType()),
    StructField("product_id", StringType()), 
    StructField("quantity", IntegerType()),
    StructField("price", DoubleType()),
    StructField("transaction_date", DateType())])

kafka_options = {
    "kafka.bootstrap.servers" : KAFKA_BROKER, 
    "topic": KAFKA_TOPIC, 
    "subscribe": KAFKA_TOPIC,
    "kafka.security.protocol":"SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "checkpointLocation": checkpoint_path, 
    "kafka.sasl.jaas.config":"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="EFDJSWUZVCTJ4TZI" password="pyyiWLLrgySmD+8CA+CKaT4TMIBNPzf+hj9N43mlNezeNYdMX/9qPdmn8FAqHzjb";""",
    "mergeSchema": True, 
    "startingOffsets" : "earliest"
}
# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Deserialize and process (example for JSON data)
processed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")



query = processed_df.writeStream \
  .format("delta")\
  .outputMode("append")\
  .option("checkpointLocation",checkpoint_path)\
  .trigger(availableNow=True)\
  .toTable("retail_catalog.silver.transactions")


query.awaitTermination()


In [0]:
Select * from retail_catalog.silver.transactions

In [0]:
%python

kafka_options = {
    # Kafka bootstrap servers (Confluent Cloud or on-prem)
    "kafka.bootstrap.servers": "pkc-56d1g.eastus.azure.confluent.cloud:9092",  # e.g. "pkc-xxxxx.us-central1.gcp.confluent.cloud:9092"

    # Topic to subscribe to
    "subscribe": "orders",

    # Starting offset (earliest, latest, or specific)
    "startingOffsets": "earliest",

    # Security protocol
    "kafka.security.protocol": "SASL_SSL",

    # Authentication mechanism
    "kafka.sasl.mechanism": "PLAIN",

    # SASL username and password (typically API key and secret from Confluent Cloud)
    "kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='YN4I3CHDNNVPHGIY' password='V2B2pBI8IzbmX6S2YQZiKBaeaXFqSXtC484isML2vMgia264QD7nmhD2vEDCzVKH';"
}

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Deserialize and process (example for JSON data)
processed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# Write to Delta Lake
query = processed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/path/to/checkpoint/location") \
    .toTable("your_delta_table_name")

query.awaitTermination()

    )
}

df = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

df.display()


In [0]:
{
  "order_id": "O000004",
  "customer_id": "C0001",
  "product_id": "P0001",
  "order_date": "2025-07-17",
  "status": "DELIVERED"
}

In [0]:
{
  "transaction_id": "T000016",
  "order_id": "O000016",
  "product_id": "P0002",
  "quantity": 5,
  "price": 740.78,
  "transaction_date": "2025-03-10"
}

In [0]:
%python
from pyspark.sql import SparkSession
from utils.logger import get_logger
from utils.audit_logger import log_audit
from utils.schema_validator import validate_schema
from utils.data_quality import run_data_quality_checks
from delta.tables import DeltaTable
from utils.schema_metadata import silver_table_metadata
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType


KAFKA_BROKER = "pkc-56d1g.eastus.azure.confluent.cloud:9092"
KAFKA_TOPIC = "orders"
checkpoint_path = "abfss://retailstream360@storageaccountsudip1.dfs.core.windows.net/silver/checkpoints"

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("product_id", StringType()), 
    StructField("order_date", DateType()),
    StructField("status", StringType())])
kafka_options = {
    "kafka.bootstrap.servers" : KAFKA_BROKER, 
    "topic": KAFKA_TOPIC, 
    "subscribe": KAFKA_TOPIC,
    "kafka.security.protocol":"SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "checkpointLocation": checkpoint_path, 
    "kafka.sasl.jaas.config":"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="EFDJSWUZVCTJ4TZI" password="pyyiWLLrgySmD+8CA+CKaT4TMIBNPzf+hj9N43mlNezeNYdMX/9qPdmn8FAqHzjb";""",
    "mergeSchema": True,
    "startingOffsets" : "earliest"
}

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Deserialize and process (example for JSON data)
processed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")






In [0]:
%python
processed_df.printSchema()

In [0]:
%python
if not validate_schema(processed_df, schema):
    print("Schema Validation Failed")

In [0]:
%python
table_conf = silver_table_metadata["orders"]
primary_keys = table_conf["primary_keys"]
dq_metrics = run_data_quality_checks(query, primary_keys)

In [0]:
%python

query = processed_df.writeStream \
  .format("delta")\
  .outputMode("append")\
  .options(**kafka_options)\
  .trigger(once=True)\
  .toTable("retail_catalog.silver.orders")


query.awaitTermination()