#MAIN


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
import os

# Configuration
catalog_name = "main"
database_name = "retail_lakehouse"
volume_name = "raw_data"

# Paths
raw_data_path = f"/Volumes/{catalog_name}/{database_name}/{volume_name}"
bronze_path = f"/Volumes/{catalog_name}/{database_name}/bronze"
checkpoints_path = f"/Volumes/{catalog_name}/{database_name}/checkpoints"

print("=== PATH VERIFICATION ===")
print(f"Raw Data Path: {raw_data_path}")
print(f"Bronze Layer Path: {bronze_path}")
print(f"Checkpoints Path: {checkpoints_path}")

# Check if raw data files exist
try:
    sales_files = dbutils.fs.ls(f"{raw_data_path}/sales")
    print(f"Found {len(sales_files)} sales files")
    for file in sales_files[:3]:  # Show first 3 files
        print(f"  - {file.path}")
except Exception as e:
    print(f" Error accessing sales files: {e}")

try:
    customer_files = dbutils.fs.ls(f"{raw_data_path}/customers")
    print(f" Found {len(customer_files)} customer files")
    for file in customer_files[:3]:  # Show first 3 files
        print(f"  - {file.path}")
except Exception as e:
    print(f" Error accessing customer files: {e}")

try:
    product_files = dbutils.fs.ls(f"{raw_data_path}/products")
    print(f" Found {len(product_files)} product files")
    for file in product_files[:3]:  # Show first 3 files
        print(f"  - {file.path}")
except Exception as e:
    print(f" Error accessing product files: {e}")


# Create necessary directories
try:
    dbutils.fs.mkdirs(bronze_path)
    dbutils.fs.mkdirs(checkpoints_path)
    print(" Created bronze and checkpoints directories")
except Exception as e:
    print(f"  Warning creating directories: {e}")

# COMMAND ----------

# Create database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
spark.sql(f"USE {database_name}")
print(f" Using database: {database_name}")

# COMMAND ----------

# Define schema for sales data
sales_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("transaction_time", StringType(), True),
    StructField("store_id", StringType(), True),
    StructField("store_name", StringType(), True),
    StructField("region", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("discount_percent", IntegerType(), True),
    StructField("discount_amount", DoubleType(), True),
    StructField("final_amount", DoubleType(), True),
    StructField("payment_method", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("customer_segment", StringType(), True),
    StructField("sales_person_id", StringType(), True),
    StructField("promotion_code", StringType(), True)
])

# Define schema for customer data
customer_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("date_of_birth", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("address_line1", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("customer_segment", StringType(), True),
    StructField("registration_date", StringType(), True),
    StructField("is_active", BooleanType(), True)
])

# Define schema for product data
product_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("sub_category", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("supplier_id", StringType(), True),
    StructField("cost_price", DoubleType(), True),
    StructField("selling_price", DoubleType(), True),
    StructField("weight", DoubleType(), True),
    StructField("dimensions", StringType(), True),
    StructField("color", StringType(), True),
    StructField("size", StringType(), True),
    StructField("launch_date", StringType(), True),
    StructField("is_active", BooleanType(), True)
])


# Auto LoaderS
def create_sales_auto_loader():
    """Create Auto Loader stream for sales data"""
    try:
        return (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "csv")
                .option("cloudFiles.schemaLocation", f"{checkpoints_path}/sales_schema")
                .option("header", "true")
                .option("inferSchema", "false")
                .schema(sales_schema)
                .load(f"{raw_data_path}/sales")
                .withColumn("file_name", col("_metadata.file_path"))
                .withColumn("processing_time", current_timestamp())
                .withColumn("bronze_layer", lit("sales_bronze"))
        )
    except Exception as e:
        print(f" Error creating sales auto loader: {e}")
        return None

def create_customer_auto_loader():
    """Create Auto Loader stream for customer data"""
    try:
        return (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "csv")
                .option("cloudFiles.schemaLocation", f"{checkpoints_path}/customer_schema")
                .option("header", "true")
                .option("inferSchema", "false")
                .schema(customer_schema)
                .load(f"{raw_data_path}/customers")
                .withColumn("file_name", col("_metadata.file_path"))
                .withColumn("processing_time", current_timestamp())
                .withColumn("bronze_layer", lit("customer_bronze"))
        )
    except Exception as e:
        print(f" Error creating customer auto loader: {e}")
        return None

def create_product_auto_loader():
    """Create Auto Loader stream for product data"""
    try:
        return (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "csv")
                .option("cloudFiles.schemaLocation", f"{checkpoints_path}/product_schema")
                .option("header", "true")
                .option("inferSchema", "false")
                .schema(product_schema)
                .load(f"{raw_data_path}/products")
                .withColumn("file_name", col("_metadata.file_path"))
                .withColumn("processing_time", current_timestamp())
                .withColumn("bronze_layer", lit("product_bronze"))
        )
    except Exception as e:
        print(f" Error creating product auto loader: {e}")
        return None


# Process data with detailed logging
def process_data_stream(stream_name, create_loader_func, checkpoint_path, output_path):
    """Process a data stream with detailed error handling"""
    print(f"\n=== Processing {stream_name} ===")
    
    try:
        # Create the stream
        stream_df = create_loader_func()
        if stream_df is None:
            print(f" Failed to create {stream_name} stream")
            return False
            
        print(f"{stream_name} stream created successfully")
        
        # Start the stream
        query = (stream_df
                .writeStream
                .format("delta")
                .outputMode("append")
                .option("checkpointLocation", checkpoint_path)
                .option("path", output_path)
                .option("mergeSchema", "true")  
                .trigger(availableNow=True)
                .start()
        )
        
        print(f" {stream_name} stream started, waiting for completion...")
        query.awaitTermination()
        
        # Check if data was written
        try:
            count = spark.read.format("delta").load(output_path).count()
            print(f" {stream_name} completed successfully - {count} records written")
            return True
        except Exception as e:
            print(f" {stream_name} stream completed but error checking output: {e}")
            return True
            
    except Exception as e:
        print(f"Error processing {stream_name}: {e}")
        return False

# COMMAND ----------

# Process all streams
sales_success = process_data_stream(
    "Sales Data",
    create_sales_auto_loader,
    f"{checkpoints_path}/sales_bronze",
    f"{bronze_path}/sales_bronze"
)

customer_success = process_data_stream(
    "Customer Data", 
    create_customer_auto_loader,
    f"{checkpoints_path}/customer_bronze",
    f"{bronze_path}/customer_bronze"
)

product_success = process_data_stream(
    "Product Data",
    create_product_auto_loader,
    f"{checkpoints_path}/product_bronze", 
    f"{bronze_path}/product_bronze"
)

# COMMAND ----------

# Create Delta tables
print("\n=== Creating Delta Tables ===")

table_configs = [
    ("sales_bronze", f"{bronze_path}/sales_bronze"),
    ("customer_bronze", f"{bronze_path}/customer_bronze"),
    ("product_bronze", f"{bronze_path}/product_bronze")
]

for table_name, location in table_configs:
    try:
        spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {database_name}.{table_name}
        USING DELTA
        LOCATION '{location}'
        """)
        print(f" Created/verified table: {database_name}.{table_name}")
    except Exception as e:
        print(f" Error creating table {table_name}: {e}")

# COMMAND ----------

# Verify data ingestion using direct Delta path queries
print("\n=== DATA INGESTION VERIFICATION ===")

# Check Sales data
try:
    sales_df = spark.read.format("delta").load(f"{bronze_path}/sales_bronze")
    sales_count = sales_df.count()
    print(f"Sales Bronze Records: {sales_count:,}")
    if sales_count > 0:
        print("Sample Sales Data:")
        sales_df.limit(3).show(truncate=False)
    else:
        print("  No sales data found")
except Exception as e:
    print(f" Error reading sales data: {e}")

# Check Customer data
try:
    customer_df = spark.read.format("delta").load(f"{bronze_path}/customer_bronze")
    customer_count = customer_df.count()
    print(f" Customer Bronze Records: {customer_count:,}")
    if customer_count > 0:
        print(" Sample Customer Data:")
        customer_df.limit(3).show(truncate=False)
    else:
        print("  No customer data found")
except Exception as e:
    print(f" Error reading customer data: {e}")

# Check Product data
try:
    product_df = spark.read.format("delta").load(f"{bronze_path}/product_bronze")
    product_count = product_df.count()
    print(f"  Product Bronze Records: {product_count:,}")
    if product_count > 0:
        print(" Sample Product Data:")
        product_df.limit(3).show(truncate=False)
    else:
        print("  No product data found")
except Exception as e:
    print(f" Error reading product data: {e}")

# COMMAND ----------

# Alternative verification using SQL with direct Delta paths
print("\n=== ALTERNATIVE VERIFICATION USING DIRECT DELTA PATHS ===")

verification_queries = [
    ("Sales", f"SELECT * FROM delta.`{bronze_path}/sales_bronze`"),
    ("Customer", f"SELECT * FROM delta.`{bronze_path}/customer_bronze`"),
    ("Product", f"SELECT * FROM delta.`{bronze_path}/product_bronze`")
]

for name, query in verification_queries:
    try:
        print(f"\n{name} Data Sample:")
        result_df = spark.sql(f"{query} LIMIT 3")
        result_df.show(truncate=False)
        
        # Count records
        count_df = spark.sql(f"SELECT COUNT(*) as count FROM delta.`{bronze_path}/sales_bronze`" if name == "Sales" 
                           else f"SELECT COUNT(*) as count FROM delta.`{bronze_path}/customer_bronze`" if name == "Customer"
                           else f"SELECT COUNT(*) as count FROM delta.`{bronze_path}/product_bronze`")
        count = count_df.collect()[0]['count']
        print(f" {name} Total Records: {count:,}")
        
    except Exception as e:
        print(f" Error querying {name} data: {e}")

# COMMAND ----------

print("\n Bronze Layer Pipeline Summary:")
print(f"  Sales Processing: {'Success' if sales_success else ' Failed'}")
print(f"  Customer Processing: {'Success' if customer_success else ' Failed'}")
print(f"  Product Processing: {'Success' if product_success else 'Failed'}")
print("  Data should now be in bronze layer tables!")

=== PATH VERIFICATION ===
Raw Data Path: /Volumes/main/retail_lakehouse/raw_data
Bronze Layer Path: /Volumes/main/retail_lakehouse/bronze
Checkpoints Path: /Volumes/main/retail_lakehouse/checkpoints
✅ Found 1 sales files
  - dbfs:/Volumes/main/retail_lakehouse/raw_data/sales/sales_data_20250812_080459.csv
✅ Found 1 customer files
  - dbfs:/Volumes/main/retail_lakehouse/raw_data/customers/customer_data_20250812_080459.csv
✅ Found 1 product files
  - dbfs:/Volumes/main/retail_lakehouse/raw_data/products/product_data_20250812_080459.csv
✅ Created bronze and checkpoints directories
✅ Using database: retail_lakehouse

=== Processing Sales Data ===
✅ Sales Data stream created successfully
⏳ Sales Data stream started, waiting for completion...
✅ Sales Data completed successfully - 10000 records written

=== Processing Customer Data ===
✅ Customer Data stream created successfully
⏳ Customer Data stream started, waiting for completion...
✅ Customer Data completed successfully - 5000 records wri