In [0]:
import logging
import sys

def get_logger(name=__name__):
    logger = logging.getLogger(name)
    if not logger.handlers:
        # Remove any pre-existing handlers if necessary:
        logger.propagate = False

        # Create a stream handler logging to stdout
        handler = logging.StreamHandler(sys.stdout)
        formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(name)s: %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
    return logger

In [0]:
def readKafkaStream(spark, serversString:str, topics:str):
    try:
        logger = get_logger(__name__)
        logger.info("starting stream...")
        stream = spark\
            .readStream\
            .format("kafka")\
            .option("kafka.bootstrap.servers", serversString)\
            .option("subscribe", topics)\
            .option("startingOffsets", "latest")\
            .load()
        logger.info("loaded stream")
        return stream 
    except Exception as e:
        logger.error("In readKafkaStream() method -> ", e)
        raise

In [0]:
from datetime import datetime
from pyspark.sql.streaming import DataStreamWriter, StreamingQuery
from pyspark.sql import DataFrame
# from ims.utils import get_logger

now = datetime.now().strftime("%Y%m%d_%H%M%S")
def writeStream(stream:DataFrame,
                file_format:str = "parquet",
                sink:str = f"/tmp/stream_output_{now}",
                checkpoint_location:str = f"/tmp/checkpoint_{now}",
                mode:str = "append",
                query_name:str = f"test_query{now}") -> StreamingQuery:
    try:        
        logger = get_logger(__name__)
        logger.info(f"starting streaming({query_name}) write to {sink} in {file_format} format with {mode} mode...")
        write_stream = stream\
            .writeStream\
            .format(file_format)\
            .option("checkpointLocation", checkpoint_location)\
            .option("path", sink)\
            .outputMode(mode)\
            .queryName(query_name)\
            .start()
        # write_stream.awaitTermination()
        logger.info(f"writing stream ({query_name}) has started")
        return query_name
    except Exception as e:
        logger.error(f"Failed to start streaming query {query_name} ", e)
        raise

In [0]:
BOOTSTRAP_SERVERS = "kafka-brokers-ip:9092"
schema_path = "/FileStore/inventory_analytics/bronze"

def writeToBronze(spark):
    cartReadStream = readKafkaStream(spark, BOOTSTRAP_SERVERS, topics="product_cart")
    cartWriteStream = writeStream(cartReadStream,
                file_format="parquet",
                sink=f"{schema_path}/product_cart",
                checkpoint_location=f"{schema_path}/chkp/product_cart/",
                query_name="write_ims_cart_rate_bronze")

    clickReadStream = readKafkaStream(spark, BOOTSTRAP_SERVERS, topics="product_click")
    clickWriteStream = writeStream(clickReadStream,
                file_format="parquet",
                sink=f"{schema_path}/product_click",
                checkpoint_location=f"{schema_path}/chkp/product_click/",
                query_name="write_ims_click_rate_bronze")

    
    purchaseReadStream = readKafkaStream(spark, BOOTSTRAP_SERVERS, topics="product_purchase")
    purchaseWriteStream = writeStream(purchaseReadStream,
                file_format="parquet",
                sink=f"{schema_path}/product_purchase",
                checkpoint_location=f"{schema_path}/chkp/product_purchase/",
                query_name="write_ims_purchase_rate_bronze")
    
    return {
        "click": clickWriteStream,
        "purchase": purchaseWriteStream,
        "cart": cartWriteStream
        }


In [0]:
streams = writeToBronze(spark)

[2025-04-26 08:30:24,018] INFO - __main__: starting stream...
[2025-04-26 08:30:26,058] INFO - __main__: loaded stream
[2025-04-26 08:30:26,060] INFO - __main__: starting streaming(write_ims_cart_rate_bronze) write to /FileStore/inventory_analytics/bronze/product_cart in parquet format with append mode...
[2025-04-26 08:30:29,512] INFO - __main__: writing stream (write_ims_cart_rate_bronze) has started
[2025-04-26 08:30:29,515] INFO - __main__: starting stream...
[2025-04-26 08:30:29,597] INFO - __main__: loaded stream
[2025-04-26 08:30:29,607] INFO - __main__: starting streaming(write_ims_click_rate_bronze) write to /FileStore/inventory_analytics/bronze/product_click in parquet format with append mode...
[2025-04-26 08:30:30,754] INFO - __main__: writing stream (write_ims_click_rate_bronze) has started
[2025-04-26 08:30:30,757] INFO - __main__: starting stream...
[2025-04-26 08:30:30,879] INFO - __main__: loaded stream
[2025-04-26 08:30:30,884] INFO - __main__: starting streaming(writ

In [0]:
for query, stream in streams.items():
    print(f"started streaming {query} with id {stream}")

started streaming click with id write_ims_click_rate_bronze
started streaming purchase with id write_ims_purchase_rate_bronze
started streaming cart with id write_ims_cart_rate_bronze
