In [None]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.kafka:kafka-clients:3.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0",
        "spark.serializer": "org.apache.spark.serializer.JavaSerializer"
    }

}


In [None]:
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, LongType, FloatType,DoubleType

class kafkaConfiguration:

    def __init__(self):
        self.base_data_dir = 
        self.BOOTSTRAP_SERVERS = 
        self.protocol=,
        self.mechanism = ,
        self.JAAS_MODULE = 
        self.CLUSTER_API_KEY =
        self.CLUSTER_API_SECRETS = 

In [None]:


class Getschema:
    ''' Define all schema for all the different files '''

    # Define schema for the 'address' file
    def address(self):
        return (
            StructType([
                StructField('City', StringType(), True),
                StructField('Country', StringType(), True),
                StructField('State', StringType(), True),
                StructField('StreetAddress', StringType(), True),
                StructField('ZipCode', StringType(), True),
                StructField('custID', IntegerType(), True)
            ]) 
        )

    # Define schema for the 'customer' file
    def customer(self):
        return (
            StructType([
                StructField("Name", StringType(), True),
                StructField("age", IntegerType(), True),
                StructField("custID", IntegerType(), True),
                StructField("email", StringType(), True),
                StructField("gender", StringType(), True),
                StructField("phoneNumber", StringType(), True)
            ])
        )

    # Define schema for the 'cart' file
    def cart(self):
        return(
            StructType([
                StructField("action", StringType()),
                StructField("CartID", IntegerType(), True),
                StructField("ProductID", IntegerType(), True),
                StructField("Quantity", IntegerType(), True),
                StructField("customerid", IntegerType(), True),
                StructField("discount", DoubleType(), True)
            ])
        )
    
    # Define schema for the 'exchange_order' file
    def exchange_order(self):
        return(
            StructType([
                StructField("ExchangeDate", TimestampType(), True),
                StructField("ExchangeID", IntegerType(), True),
                StructField("ExchangeItem", StringType(), True),
                StructField("ExchangeReason", StringType(), True),
                StructField("OrderID", IntegerType(), True)
            ])
        )

    # Define schema for the 'inventory' file
    def inventory(self):
        return(
            StructType([
                StructField("LastStockUpdate", DateType(), True),
                StructField("ProductID", IntegerType(), True),
                StructField("RestockingAlert", StringType(), True),
                StructField("StockLevel", IntegerType(), True),
                StructField("SupplierID", IntegerType(), nullable=False)
            ])
        )

    # Define schema for the 'order' file
    def order(self):
        return(StructType([
                StructField("customerID", IntegerType()),
                StructField("OrderDate", TimestampType()),
                StructField("OrderID", LongType()),
                StructField("PaymentMethod", StringType()),
                StructField("TotalAmount", FloatType()),
                StructField("ProductID", IntegerType())
            ])
        )

    # Define schema for the 'return_order' file
    def return_order(self):
        return(StructType([
                StructField("OrderID", IntegerType(), True),
                StructField("RefundAmount", IntegerType(), True),
                StructField("ReturnDate", TimestampType(), True),
                StructField("ReturnID", IntegerType(), True),
                StructField("ReturnReason", StringType(), True),
                StructField("customerid", IntegerType(), True)
            ])
        )

    # Define schema for the 'product_cost' file
    def product_cost(self):
        return(StructType([
                StructField("endDate", TimestampType(), nullable=False),
                StructField("productID", IntegerType(), True),
                StructField("standardCost", IntegerType(), nullable=False),
                StructField("startDate", TimestampType(), True)
            ])
        )
    
    # Define schema for the 'product' file
    def product(self):
        return( StructType([
                StructField("Category", StringType(), True),
                StructField("Description", StringType(), True),
                StructField("ProductID", IntegerType(), True),
                StructField("ProductName", StringType(), True)
            ])
        )

    # Define schema for the 'product_location' file
    def product_location(self):
        return( StructType([
                StructField("City", StringType(), True),
                StructField("Country", StringType(), True),
                StructField("LocationID", IntegerType(), True),
                StructField("LocationName", StringType(), True),
                StructField("State", StringType(), True),
                StructField("productid", IntegerType(), True)
            ])
        )

    # Define schema for the 'shipping' file
    def shipping(self):
        return( StructType([
                StructField("City", StringType(), True),
                StructField("Country", StringType(), True),
                StructField("DeliveryDate", TimestampType(), True),
                StructField("ShipmentDate", TimestampType(), True),
                StructField("ShippingAddress", StringType(), True),
                StructField("ShippingID", IntegerType(), True),
                StructField("State", StringType(), True),
                StructField("ZipCode", StringType(), True)
            ])
        )
    
    # Define schema for the 'stock_movement' file
    def stock_movement(self):
        return(StructType([
                StructField("MovementDate", DateType(), True),
                StructField("MovementType", StringType(), True),
                StructField("ProductID", IntegerType(), True),
                StructField("Quantity", IntegerType(), True)
            ])
        )

    # Define schema for the 'supplier' file
    def supplier(self):
        return(StructType([
                StructField("ContactPerson", StringType(), True),
                StructField("Email", StringType(), True),
                StructField("Phone", StringType(), True),
                StructField("SupplierID", IntegerType(), True),
                StructField("SupplierName", StringType(), True)
            ])
        )

    # Define schema for the 'membership' file
    def membership(self):
        return(StructType([
                StructField("End_date", DateType(), True),
                StructField("Level", StringType(), True),
                StructField("MembershipID", IntegerType(), True),
                StructField("Start_date", DateType(), True),
                StructField("custID", IntegerType(), True)
            ])
        )


In [None]:


class BronzeStream:
    """
    - ingest data from kafka
    - transform value from binary to string
    - transform string value to struct type
    - save the stream to bronze table
    """

    def __init__(self):
        self.conf = kafkaConfiguration()
        self.schema_generator = Getschema()

    def getTopicSchema(self, topic: str) -> StructType:
        schema_method = getattr(self.schema_generator, topic.lower(), None)  # Get the schema method dynamically
        if schema_method:
            return schema_method()  # Call the schema method and return the schema
        else:
            raise ValueError(f"No schema defined for topic: {topic}")

    def ingestFromKafka(self, topic: str):
        return (
            spark.readStream.format("kafka")
            .option("kafka.bootstrap.servers", self.conf.BOOTSTRAP_SERVERS)
            .option("kafka.security.protocol", "SASL_SSL")
            .option("kafka.sasl.mechanism", "PLAIN")
            .option("kafka.sasl.jaas.config",
                    f"{self.conf.JAAS_MODULE} required username='{self.conf.CLUSTER_API_KEY}' password='{self.conf.CLUSTER_API_SECRETS}';")
            .option("startingOffsets", "earliest")
            .option("subscribe", topic)
            .option("failOnDataLoss", "False")
            .load()
        )

    def getReading(self, kafka_df):
        return kafka_df.select(
            f.col("key").cast("string"), f.col("value").cast("string")
        )


class ProcessStream:

    def __init__(self):
        self.bStream = BronzeStream()

      # Handle the exception accordingly

    def processOrder(self, topic):
        rawDF = self.bStream.ingestFromKafka(topic)
        readingDF = self.bStream.getReading(rawDF)
        schema = self.bStream.getTopicSchema(topic)
        processOrderDF = readingDF.withColumn(
            "json_value", f.from_json(f.col("value"), schema)
        ).select(
            f.col("json_value.CustomerID").alias("CustomerID"),
            f.col("json_value.OrderDate").alias("OrderDate"),
            f.col("json_value.OrderID").alias("OrderID"),
            f.col("json_value.TotalAmount").alias("TotalAmount"),
            f.col("json_value.PaymentMethod").alias("PaymentMethod"),
            f.col("json_value.ProductID").alias("ProductID")
        )
        return processOrderDF

    def processCart(self,topic):
        rawDF = self.bStream.ingestFromKafka("cart")
        readingDF = self.bStream.getReading(rawDF)
        schema = self.bStream.getTopicSchema("cart")
        processCartDF = readingDF.withColumn(
            "json_value", f.from_json(f.col("value"), schema)
        ).select(
            f.col("json_value.CartID").alias("CartID"),
            f.col("json_value.ProductID").alias("ProductID"),
            f.col("json_value.Quantity").alias("Quantity"),
            f.col("json_value.customerid").alias("customerid"),
            f.col("json_value.discount").alias("discount"),
            f.col("json_value.action").alias("action")
        )

        return processCartDF



    def writeToBronzeTable(self, processDF, topic: str):
        table_name = f'bronze.Streaming_{topic}'
        checkpoint_location = f"{self.bStream.conf.base_data_dir}/checkpoint/{topic}"
        try:
            query = (
                processDF.writeStream
                .queryName(f"bronze-ingestion-{topic}")
                .option("checkpointLocation", checkpoint_location)
                .outputMode("append")
                .toTable(table_name)
            )
            return query  # Return the query object for monitoring
        except Exception as e:
            print("Exception caught: ", e)
            return None


In [None]:

# Create an instance of ProcessStream
processor = ProcessStream()

# Process the Kafka stream for the "order" topic
processOrderDF = processor.processOrder("order")
writeOrder=processor.writeToBronzeTable(processOrderDF,"order")

processCartDF= processor.processCart("cart")
writeCart=processor.writeToBronzeTable(processCartDF, "cart")

# Show the schema and a few rows of the processed DataFrame

#processOrderDF.show(5, truncate=False)



<mark>**optimise the bronze delta Table**</mark>