In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead,col,broadcast,collect_set,size,array_contains,collect_list,min,datediff, avg,rank,when,count,coalesce,lit,desc,sum,countDistinct,udf,date_format
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
import re
from delta.tables import DeltaTable

In [0]:
%run "./Bronze/"

In [0]:
class IncrementalPreProcessor:
    def __init__(self, bronze_table_path, silver_table_path):
        self.bronze_table_path = bronze_table_path
        self.silver_table_path = silver_table_path
        self.checkpoint_path =  silver_table_path + "/checkpoint/"

    def preprocess_stream(self):
        # Step 1: Read from Bronze Delta table as a Stream
        bronze_stream = (spark.readStream
            .format("delta")
            .load(self.bronze_table_path))

        # Step 2: Apply Preprocessing (drop columns, fill nulls, etc.)
        processed_stream = (bronze_stream
            .drop("order_approved_at", "order_delivered_carrier_date", "order_delivered_customer_date", "order_estimated_delivery_date","_rescued_data")
            .fillna({"order_status": "unavailable"}) 
        )

        # Step 3: Write preprocessed data to Silver Delta Table
        query = (processed_stream.writeStream
            .format("delta")
            .outputMode("append")
            .queryName("Preprocessing Stream data")
            .option("checkpointLocation", self.checkpoint_path)
            .start(self.silver_table_path))

        return query

# # Paths for Delta tables
# bronze_table_path = "abfss://bronze@ecommerceproject.dfs.core.windows.net/orders/delta_table"
# silver_table_path = "abfss://silver@ecommerceproject.dfs.core.windows.net/orders"

# # Start Streaming Preprocessing
# processor = IncrementalPreProcessor(bronze_table_path, silver_table_path)
# query = processor.preprocess_stream()
# query.awaitTermination(30)


In [0]:
# %sql
# create schema if not exists eco.silver

In [0]:
class PreProcessPipeline:
    def __init__(self, storage_base_path: str):
        self.storage_base_path = storage_base_path.rstrip('/')  # Remove trailing slash if any.

    def preprocess_data(self, df: DataFrame, drop_columns: list = None, fill_na_dict: dict = None) -> DataFrame:
        if drop_columns:
            print(f"Dropping columns: {drop_columns}")
            df = df.drop(*drop_columns)
        if fill_na_dict:
            print(f"Filling nulls with: {fill_na_dict}")
            df = df.fillna(fill_na_dict)
        return df

    def store_as_delta(self, df: DataFrame, table_name: str):
        delta_table_path = f"{self.storage_base_path}/{table_name}"
        try:
            df.write.format("delta").mode("overwrite").save(delta_table_path)
            print(f" Data stored as Delta table at {delta_table_path}")
        except Exception as e:
            print(f" Error storing Delta table at {delta_table_path}: {e}")
            raise

    def run_pipeline(self, df: DataFrame, table_name: str, drop_columns: list = None, fill_na_dict: dict = None):

        print(f" Starting PreProcess pipeline for table: {table_name}")
        
        # Step 1: Preprocess data (drop columns and fill nulls)
        processed_df = self.preprocess_data(df, drop_columns, fill_na_dict)
        
        # Define the Delta table path in storage.
        delta_table_path = f"{self.storage_base_path}/{table_name}"
        
        # Step 2: Check if Delta table exists in storage; if not, store the DataFrame.
        if not DeltaTable.isDeltaTable(spark, delta_table_path):
            print(f"Delta table not found at {delta_table_path}. Storing data as Delta table.")
            self.store_as_delta(processed_df, table_name)
        else:
            print(f"Delta table already exists at {delta_table_path}. Skipping store as delta step.")

        print(f" Pre-Processing pipeline completed for table: {table_name}")

storage_base_path = "abfss://silver@ecommerceproject.dfs.core.windows.net/"

In [0]:
def get_preprocessed_order_items(inputDf: dict) -> DataFrame:

    # Define configuration for the order_items table.
    config = {
        "table_name": "order_items",                  
        "drop_columns": ["seller_id", "shipping_limit_date"],
        "fill_na_dict": {"price": 0.0, "freight_value": 0.0}
    }
    
    # Initialize the PreProcess pipeline.
    pipeline = PreProcessPipeline(storage_base_path)
    
    # Run the pipeline only once.
    pipeline.run_pipeline(inputDf.get("order_itemsDf"), config["table_name"], config["drop_columns"], config["fill_na_dict"])

    # Read the preprocessed table from ADLS
    delta_table_path = f"{storage_base_path}/{config['table_name']}"
    print(f"Loading preprocessed table from Delta path: {delta_table_path}")
    return spark.read.format("delta").load(delta_table_path)


In [0]:
def get_preprocessed_products(inputDf: dict) -> DataFrame:

    # Define configuration for the products table.
    config = {
        "table_name": "products",                  
        "drop_columns": ["product_name_lenght","product_description_lenght", "product_photos_qty","product_weight_g","product_length_cm", "product_height_cm","product_width_cm"],
        "fill_na_dict": {"product_category_name": "unknown"}
    }
    
    # Initialize the PreProcess pipeline.
    pipeline = PreProcessPipeline(storage_base_path)
    
    # Run the pipeline only once.
    pipeline.run_pipeline(inputDf.get("products_Df"), config["table_name"], config["drop_columns"], config["fill_na_dict"])
    
    # Read the preprocessed table from ADLS
    delta_table_path = f"{storage_base_path}/{config['table_name']}"
    print(f"Loading preprocessed table from Delta path: {delta_table_path}")
    return spark.read.format("delta").load(delta_table_path)


In [0]:

def get_preprocessed_customers(inputDf: dict) -> DataFrame:

    # Define configuration for the products table.
    config = {
        "table_name": "customers",                  
        "drop_columns": ["customer_unique_id"],
        "fill_na_dict": {}
    }
    
    # Initialize the PreProcess pipeline.
    pipeline = PreProcessPipeline(storage_base_path)
    
    # Run the pipeline only once.
    pipeline.run_pipeline(inputDf.get("customerDf"), config["table_name"], config["drop_columns"], config["fill_na_dict"])
    
    # Read the preprocessed table from ADLS
    delta_table_path = f"{storage_base_path}/{config['table_name']}"
    print(f"Loading preprocessed table from Delta path: {delta_table_path}")
    return spark.read.format("delta").load(delta_table_path)

In [0]:
def get_preprocessed_product_category(inputDf: dict) -> DataFrame:

    # Define configuration for the products table.
    config = {
        "table_name": "product_category",                  
        "drop_columns": [],
        "fill_na_dict": {}
    }
    
    # Initialize the PreProcess pipeline.
    pipeline = PreProcessPipeline(storage_base_path)
    
    # Run the pipeline only once.
    pipeline.run_pipeline(inputDf.get("categroy_translation_Df"), config["table_name"], config["drop_columns"], config["fill_na_dict"])
    
    # Read the preprocessed table from ADLS
    delta_table_path = f"{storage_base_path}/{config['table_name']}"
    print(f"Loading preprocessed table from Delta path: {delta_table_path}")
    return spark.read.format("delta").load(delta_table_path)

In [0]:
def get_preprocessed_geolocation(inputDf: dict) -> DataFrame:
    replacements = {
        'á': 'a', 'à': 'a', 'ã': 'a', 'â': 'a', 'ä': 'a',
        'é': 'e', 'è': 'e', 'ê': 'e',
        'í': 'i', 'ì': 'i', 'î': 'i',
        'ó': 'o', 'ò': 'o', 'ô': 'o', 'õ': 'o', 'ö': 'o',
        'ú': 'u', 'ù': 'u', 'ü': 'u',
        'ç': 'c', 'ñ': 'n'
    }

    # Function to replace accented characters
    def replace_accented(text):
        if text:
            return re.sub(r'[áàãâäéèêíìîóòôõöúùüçñ]', lambda match: replacements.get(match.group(0), match.group(0)), text)
        return text

    # Convert function to PySpark UDF
    replace_accented_udf = udf(replace_accented, StringType())

    # Read geolocation dataset
    df = inputDf.get("geolocationDf")

    # Drop duplicate zip codes (keeping only one row per zip code)
    df_unique = df.dropDuplicates(["geolocation_zip_code_prefix"])

    # Standardize city names
    df_cleaned = df_unique.withColumn("cleaned_geolocation_city", replace_accented_udf(col("geolocation_city")))

    # Define preprocessing configuration
    config = {
        "table_name": "geolocation_cleaned",
        "drop_columns": ["geolocation_lat","geolocation_lng","geolocation_city"], 
        "fill_na_dict": {}  
    }

    # Initialize preprocessing pipeline
    pipeline = PreProcessPipeline(storage_base_path)

    # Run the pipeline
    pipeline.run_pipeline(df_cleaned, config["table_name"], config["drop_columns"], config["fill_na_dict"])

    # Read the preprocessed table from ADLS
    delta_table_path = f"{storage_base_path}/{config['table_name']}"
    print(f"Loading preprocessed table from Delta path: {delta_table_path}")
    return spark.read.format("delta").load(delta_table_path)


In [0]:
class Transformer:
    def __init__(self):
        pass
    def transform(self, inputDf):
        raise NotImplementedError("transform() not implemented")

class TotalRevenueTransformer(Transformer):
    """Total revenue from orders"""
    def transform(self, preprocessed_df: DataFrame):
        # Now preprocessed_df is the deep-cloned table loaded from Unity Catalog.
        transformed_df = preprocessed_df.withColumn("Item_Total", col("price") + col("freight_value"))
        revenue_df = transformed_df.groupby("order_id").agg(sum("Item_Total").alias("Total_price"))
        print("Total revenue per order:")
        revenue_df.orderBy("Total_price", ascending=False).display()
        rdf=revenue_df.select(sum("Total_price").alias("total_revenue"))
        rdf.display()
        return rdf
    
class RevenueByProductCategoryTransformer(Transformer):
    """Total revenue from each product category"""
    def transform(self, joined_df: DataFrame):
        # Now joined_df is the join of order_items,products and product_category tables.
        transformed_df = joined_df.withColumn("Item_Total", col("price") + col("freight_value"))
        revenue_df = transformed_df.groupby("product_category_name_english").agg(sum("Item_Total").alias("Category_Revenue"))
        print("Total revenue per category:")
        rdf=revenue_df.orderBy("Category_Revenue", ascending=False)
        #revenue_df.select(sum("Total_price").alias("total_revenue")).display()
        rdf.display()
        return rdf
    
class TopSellingProduct(Transformer):
    """Top Selling Products"""
    def transform(self, preprocessed_df: DataFrame):
        # Use the same preprocessed table.
        transformed_df = preprocessed_df.withColumn("Item_Total", col("price") + col("freight_value"))
        result_df = transformed_df.groupby("product_id").agg(count("order_item_id").alias("order_count"))
        print("Top Selling Products (by order count):")
        rdf=result_df.orderBy(desc("order_count"))
        rdf.display()
        return rdf

class RevenueTrendOverTimeTransformer(Transformer):
    """Daily revenue from orders"""
    def transform(self, joined_df: DataFrame):
        # Now preprocessed_df is the deep-cloned table loaded from Unity Catalog.
        transformed_df = joined_df.withColumn("Item_Total", col("price") + col("freight_value"))
        revenue_df = transformed_df.groupby(date_format("order_purchase_timestamp", "yyyy-MM-dd").alias("date")).agg(sum("Item_Total").alias("Daily_Revenue"))
        print("Daily revenue :")
        rdf=revenue_df.orderBy("Daily_Revenue", ascending=False)
        # revenue_df.select(sum("Total_price").alias("total_revenue")).display()
        rdf.display()
        return rdf
    
class GeographicRevenueTransformer(Transformer):
    """State wise revenue from orders"""
    def transform(self, joined_df: DataFrame):
        # Now preprocessed_df is the deep-cloned table loaded from Unity Catalog.
        transformed_df = joined_df.withColumn("Item_Total", col("price") + col("freight_value"))
        revenue_df = transformed_df.groupby("customer_state").agg(sum("Item_Total").alias("State_Revenue"))
        print("State wise revenue :")
        rdf=revenue_df.orderBy("State_Revenue", ascending=False)
        rdf.display()
        return rdf
    
class CustomerValueTransformer(Transformer):
    """State wise revenue from orders"""
    def transform(self, joined_df: DataFrame):
        # Now preprocessed_df is the deep-cloned table loaded from Unity Catalog.
        transformed_df = joined_df.withColumn("Item_Total", col("price") + col("freight_value"))
        revenue_df = transformed_df.groupby("customer_id").agg(sum("Item_Total").alias("Customer_Spent"))
        revenue_df = revenue_df.withColumn(
            "Customer_Category",
            when((col("Customer_Spent") >= 5001), "High")
            .when((col("Customer_Spent") >= 2001) & (col("Customer_Spent") <= 5000), "Mid")
            .when((col("Customer_Spent") >= 0) & (col("Customer_Spent") <= 2000), "Low")
            .otherwise("Unknown")
        )
        print("Customer Revenue and Category:")
        rdf=revenue_df.orderBy(col("Customer_Spent").desc())
        rdf.display()
        return rdf

In [0]:
# spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")

In [0]:
from pyspark.sql.streaming import StreamingQuery


In [0]:
def create_stream_static_join(stream_table_name: DataFrame, static_table_name: DataFrame, stream_join_column: str, static_join_column: str,output_table_name: str, output_checkpoint_path: str) -> 'StreamingQuery':
    # Step 1: Read streaming data from Unity Catalog
    stream_df =stream_table_name

    # Step 2: Read static batch data from Unity Catalog
    static_df = static_table_name

    # Step 3: Perform the join (default shuffle join)
    joined_stream = (stream_df
        .join(static_df, stream_df[stream_join_column] == static_df[static_join_column], "left") ) # Left join

    # Step 4: Select all columns from stream_df and all columns from static_df except the join column
    selected_columns = [stream_df[col] for col in stream_df.columns] + \
                       [static_df[col] for col in static_df.columns if col != static_join_column]

    joined_stream = joined_stream.select(*selected_columns)

    # Step 4: Write the result to a Delta table with checkpointing
    query = (joined_stream.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", output_checkpoint_path)
        .option("mergeSchema", "true")
        .start(output_table_name) )
    return query


In [0]:
def productwise_revenue_join(order_items,products,product_categories):
    joined_df = order_items.join(products, order_items.product_id == products.product_id) \
                       .join(product_categories, products.product_category_name == product_categories.product_category_name)\
                           .select(order_items["order_id"], order_items["order_item_id"], order_items["product_id"], order_items["price"], order_items["freight_value"],products["product_category_name"],product_categories["product_category_name_english"])

    joined_df.write.format("delta").mode("overwrite").save("abfss://gold@ecommerceproject.dfs.core.windows.net/joins/productwise_revenue/delta_table")

    return joined_df

In [0]:
def order_items_join(order_df: DataFrame, order_items_df: DataFrame) -> DataFrame:
    stream_table_name =order_df
    static_table_name = order_items_df
    stream_join_column = "order_id" 
    static_join_column = "order_id" 
    output_table_name = "abfss://gold@ecommerceproject.dfs.core.windows.net/joins/orders_items_join/delta_table" 
    output_checkpoint_path = "abfss://gold@ecommerceproject.dfs.core.windows.net/joins/orders_items_join/checkpoint" 
    query = create_stream_static_join(stream_table_name, static_table_name, stream_join_column,static_join_column, output_table_name, output_checkpoint_path)
    
    # spark.sql("create table if not exists eco.gold.orders_items_join location 'abfss://gold@ecommerceproject.dfs.core.windows.net/joins/orders_items_join/delta_table'")
    query.awaitTermination(10)
    joined_stream_df = spark.readStream.format("delta").load("abfss://gold@ecommerceproject.dfs.core.windows.net/joins/orders_items_join/delta_table")
    return joined_stream_df

In [0]:
def customer_spending_join(orders:DataFrame,order_items:DataFrame,customers:DataFrame) -> DataFrame:
    order_items_join(orders,order_items)
    stream_table_name = order_items_join(orders,order_items) 
    static_table_name = customers  
    stream_join_column = "customer_id" 
    static_join_column = "customer_id" 
    output_table_name = "abfss://gold@ecommerceproject.dfs.core.windows.net/joins/customer_spending/delta_table"  
    output_checkpoint_path = "abfss://gold@ecommerceproject.dfs.core.windows.net/joins/customer_spending/checkpoint" 
    query = create_stream_static_join(stream_table_name, static_table_name,stream_join_column,static_join_column, output_table_name, output_checkpoint_path) 
    # spark.sql("create table if not exists eco.gold.customer_spending location 'abfss://gold@ecomadls.dfs.core.windows.net/joins/customer_spending/delta_table'")
    query.awaitTermination(30)
    joined_stream_df = spark.readStream.format("delta").load("abfss://gold@ecommerceproject.dfs.core.windows.net/joins/customer_spending/delta_table")
    return joined_stream_df

In [0]:
# source_path = "abfss://stream-data@ecommerceproject.dfs.core.windows.net/"
# bronze_table_path = "abfss://bronze@ecommerceproject.dfs.core.windows.net/orders"
# delta_table_path="abfss://bronze@ecommerceproject.dfs.core.windows.net/orders/delta_table/"
# silver_table_path = "abfss://silver@ecommerceproject.dfs.core.windows.net/orders/"

# orderi_extractor = Order_items()
# orderitemDf = orderi_extractor.extract()

# cust_extractor=Customers()
# customerDf = cust_extractor.extract()

# extractor = StreamingProcessor(source_path, bronze_table_path)
# orderDf = extractor.start_streaming_job()
# orderDf.awaitTermination(30)


#         #Tranformation: Preprocessing & Joins
# preprocessed_order_items_df = get_preprocessed_order_items(orderitemDf)
# preprocessed_customer_df = get_preprocessed_customers(customerDf)
# preprocessed_order_df=IncrementalPreProcessor(delta_table_path, silver_table_path)
# query=preprocessed_order_df.preprocess_stream()
# query.awaitTermination(30)
# preprocessed_order_df=spark.readStream.format("delta").load(silver_table_path)
#         #Transformation:Join
# joined_df = customer_spending_join(preprocessed_order_df,preprocessed_order_items_df,preprocessed_customer_df)
# # stream_joindf=spark.readStream.format("delta").load("abfss://gold@ecomadls.dfs.core.windows.net/customer_spending/delta_table")

#         #Transformation: Aggregation
# customer_value_transformer = CustomerValueTransformer()
# revenue_result = customer_value_transformer.transform(joined_df)
# display(revenue_result)

In [0]:
# source_path = "abfss://stream-data@ecommerceproject.dfs.core.windows.net/"
# bronze_table_path = "abfss://bronze@ecommerceproject.dfs.core.windows.net/orders"
# delta_table_path="abfss://bronze@ecommerceproject.dfs.core.windows.net/orders/delta_table/"
# silver_table_path = "abfss://silver@ecommerceproject.dfs.core.windows.net/orders/"


In [0]:
def geographic_revenue_join(orders,order_items,customers,geolocation):
    stream_table_name = customer_spending_join(orders,order_items,customers)
    static_table_name = geolocation  
    stream_join_column = "customer_zip_code_prefix" 
    static_join_column = "geolocation_zip_code_prefix" 
    output_table_name = "abfss://gold@ecommerceproject.dfs.core.windows.net/joins/geographic_revenue/delta_table"  
    output_checkpoint_path = "abfss://gold@ecommerceproject.dfs.core.windows.net/joins/geographic_revenue/checkpoint" 
    # Create and start the stream-static join job using the factory function
    query = create_stream_static_join(stream_table_name, static_table_name, stream_join_column,static_join_column, output_table_name, output_checkpoint_path)
    # spark.sql("create table if not exists eco.gold.geographic_revenue location 'abfss://gold@ecommerceproject.dfs.core.windows.net/joins/geographic_revenue/delta_table'")
    query.awaitTermination(30)
    joined_stream_df = spark.readStream.format("delta").load("abfss://gold@ecommerceproject.dfs.core.windows.net/joins/geographic_revenue/delta_table")
    return joined_stream_df