In [0]:
%run ../../config/config 

In [0]:
%run ../../config/sqlconfig

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType, StructField, DecimalType
from pyspark.sql.functions import col, lit,filter
import os

In [0]:
class SalesStreamingETL:
    """
    Class to handle Sales Streaming ETL using Auto Loader
    """

    def __init__(self, base_path: str, checkpoint_path: str, schema_path: str):
        self.spark = spark
        self.base_path = base_path
        self.checkpoint_path = checkpoint_path
        self.schema_path = schema_path

        # Paths
        self.schema_path = f"{self.schema_path}/sales_schema"
        self.checkpoint_path = f"{self.checkpoint_path}/sales_table"
        self.source_path = f"{base_path}/sales/"

        # Schema
        self.schema = self._define_schema()

    # -------------------------------
    # Define schema
    # -------------------------------
    def _define_schema(self) -> StructType:
        return StructType([
            StructField("sales_id", IntegerType(), False),
            StructField("transaction_ts", StringType(), True),
            StructField("date_key", IntegerType(), True),
            StructField("product_key", IntegerType(), True),
            StructField("customer_key", IntegerType(), True),
            StructField("store_key", IntegerType(), True),
            StructField("payment_key", IntegerType(), True),
            StructField("quantity", IntegerType(), True),
            StructField("unit_price", DecimalType(10, 2), True),
            StructField("discount_pct", DecimalType(5, 2), True),
            StructField("net_sales_amount", DecimalType(10, 2), True),
            StructField("tax_amount", DecimalType(10, 2), True)
        ])

    # -------------------------------
    # Read streaming data
    # -------------------------------
    def read_stream(self, file_name, delimiter_type: str):
        return (
            self.spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.schemaLocation", f"{self.schema_path}/{file_name}")
            .option("delimiter", delimiter_type)
            .schema(self.schema)
            .load(f"{self.source_path}/{file_name}")
        )

    # -------------------------------
    # Data Cleaning
    # -------------------------------
    def clean_streaming_df(self, df):
        """
        Clean streaming DataFrame by:
        1. Removing header row if present
        2. Removing exact duplicate rows
        """
        try:            
            return df.dropDuplicates()
        except Exception as e:
            print(f"Error cleaning streaming DataFrame: {e}")

    def clean_header_df(self, df, header_col: str):
        """
        Clean streaming DataFrame by:
        1. Removing header row if present
        2. Removing exact duplicate rows
        """
        try:            
            withoutduplicate =  df.filter(df[header_col] != header_col)
            return withoutduplicate
        except Exception as e:
            print(f"Error cleaning streaming DataFrame: {e}")

    # -------------------------------
    # Write to Delta Table (Streaming)
    # -------------------------------
    def write_stream(self, df, folder_name):
        try:
            query =  (
            df.writeStream
            .option("checkpointLocation", f"{self.checkpoint_path}/{folder_name}")
            .trigger(availableNow=True)
            .toTable("workspace.etl_practice.fact_sales")
            )
            query.awaitTermination()
        except Exception as e:
            print(f"Streaming job failed: {e}")

    # -------------------------------
    # Run ETL
    # -------------------------------
    def run(self, folder_name, delimiter): 
        raw_df = self.read_stream(folder_name, delimiter)
        cleaned_df = self.clean_streaming_df(raw_df)
        cleaned_df = self.clean_header_df(cleaned_df, 'transaction_ts')
        self.write_stream(cleaned_df, folder_name)


In [0]:
# -------------------------------
# Config / Parameters
# -------------------------------
source_path = f"{base_path}"
checkpoint_path = f"{checkpoint_path}/sales_table"
schema_path = f"{base_path}/sales_schema"


In [0]:
subfolders = dbutils.fs.ls(f"{source_path}/sales")
for item in subfolders:
    delimiter = ''
    folder_name = ''
    if(item.name == 'comma/'):
        delimiter = ','
        folder_name ='comma'
    if(item.name == 'pipe/'):
        delimiter = '|'
        folder_name ='pipe'
    if(item.name == 'semicolon/'):
        delimiter = ';'
        folder_name ='semicolon'
    obj = SalesStreamingETL(source_path, checkpoint_path, schema_path)
    obj.run(folder_name, delimiter)

In [0]:
%sql
-- select * from fact_sales order by sales_id

In [0]:
#.option("cloudFiles.partitionColumns", 'transaction_ts')

In [0]:
%sql
-- select *from csv.`/Volumes/workspace/etl_practice/my_file/sales/comma/sales_001.csv`  with(header="true", delimiter= ",")

In [0]:
%sql
-- select *from csv.`/Volumes/workspace/etl_practice/my_file/sales/pipe/sales_003.csv` with(header="true", delimiter= "|")

In [0]:
%sql
-- select *from csv.`/Volumes/workspace/etl_practice/my_file/sales/semicolon/sales_002.csv` with (   header = "true",  delimiter = ";");