# Customer Data Pipeline (Streaming Processing)

This notebook defines a Lakeflow Declarative Pipeline (LDP) for streaming processing of customer data, executed by the Databricks bundle resource at `resources/dabs_pipeline.yml`.

- Catalog and schema are configured in the pipeline resource and determine where tables are published in Unity Catalog.
- Tables materialize directly to `catalog.schema.table` (no `LIVE` schema).
- This pipeline processes real-time customer data using streaming sources.


In [None]:
# Import LDP and required libraries for streaming
import dlt
import sys
sys.path.append(spark.conf.get("bundle.sourcePath", "."))
from pyspark.sql.functions import expr, col, to_timestamp, hour, dayofweek, month, year, unix_timestamp, when, round, sum, avg, count, countDistinct, current_timestamp, lit, abs
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Register FakeDataSource for streaming data generation
try:
    from pyspark_datasources import FakeDataSource
    spark.dataSource.register(FakeDataSource)
    print("FakeDataSource registered successfully")
except ImportError as e:
    print(f"Warning: Could not import FakeDataSource: {e}")
    print("Make sure pyspark-data-sources[fake] is installed")


## Batch Data Source: Fake Customer Data

This section creates a batch data source using the FakeDataSource to generate synthetic customer data. The data includes customer demographics, purchase behavior, and location information.

The batch source generates:
- `customer_id`: Unique identifier
- `name`: Customer full name
- `email`: Email address
- `age`: Age between 18-80
- `city`: City name
- `country`: Country name
- `purchase_amount`: Random purchase amount
- `product_category`: Product category
- `timestamp`: Current timestamp for processing


In [None]:
# Define schema for fake customer data using valid Faker method names
# Note: All fields must be StringType as required by pyspark_datasources.fake
faker_schema = StructType([
    StructField("uuid4", StringType(), True),  # Will be mapped to customer_id
    StructField("name", StringType(), True),   # Valid Faker method
    StructField("email", StringType(), True),   # Valid Faker method
    StructField("random_int", StringType(), True),  # Will be converted to age (int)
    StructField("city", StringType(), True),    # Valid Faker method
    StructField("country", StringType(), True), # Valid Faker method
    StructField("pyfloat", StringType(), True), # Will be converted to purchase_amount (double)
    StructField("word", StringType(), True),    # Will be mapped to product_category
    StructField("date_time", StringType(), True)  # Will be converted to timestamp
])

# Create streaming data source using FakeDataSource
@dlt.table(
    comment="Streaming fake customer data from FakeDataSource"
)
def customer_stream_raw():
    # Create streaming DataFrame using fake data source with valid Faker schema
    df_faker = spark.readStream \
        .format("fake") \
        .schema(faker_schema) \
        .option("numRows", 100) \
        .load()
    
    # Transform Faker data to match our desired customer schema
    # Convert string fields to appropriate types
    df = df_faker \
        .withColumn("customer_id", col("uuid4")) \
        .withColumn("age", (abs(col("random_int").cast("int")) % 63) + 18) \
        .withColumn("purchase_amount", abs(col("pyfloat").cast("double")) * 500 + 10) \
        .withColumn("product_category", 
                   when(col("word").rlike("^[aeiou]"), "Electronics")
                   .when(col("word").rlike("^[bcdfg]"), "Clothing")
                   .when(col("word").rlike("^[hjklm]"), "Books")
                   .when(col("word").rlike("^[npqr]"), "Home & Garden")
                   .otherwise("Sports")) \
        .withColumn("timestamp", to_timestamp(col("date_time"))) \
        .withColumn("processing_timestamp", current_timestamp()) \
        .select("customer_id", "name", "email", "age", "city", "country", 
                "purchase_amount", "product_category", "timestamp", "processing_timestamp")
    
    return df

## Silver Layer: Processed Customer Data

This section processes the raw streaming customer data with data quality checks and transformations:

- Validates customer age (18-80)
- Ensures purchase amounts are positive
- Adds derived fields for analytics
- Applies data quality expectations


In [None]:
# Silver table: processed customer data with quality checks
@dlt.table(
    comment="Processed customer data with quality expectations and derived fields"
)
@dlt.expect_or_drop("valid_age", "age >= 18 AND age <= 80")
@dlt.expect_or_drop("valid_purchase", "purchase_amount > 0")
@dlt.expect("non_null_customer_id", "customer_id IS NOT NULL")
@dlt.expect("non_null_email", "email IS NOT NULL")
def customer_stream_silver():
    df = dlt.read("customer_stream_raw")
    
    # Add derived fields for analytics
    processed_df = df \
        .withColumn("age_group", 
                   when(col("age") < 25, "18-24")
                   .when(col("age") < 35, "25-34")
                   .when(col("age") < 45, "35-44")
                   .when(col("age") < 55, "45-54")
                   .when(col("age") < 65, "55-64")
                   .otherwise("65+")) \
        .withColumn("purchase_tier",
                   when(col("purchase_amount") < 50, "Low")
                   .when(col("purchase_amount") < 200, "Medium")
                   .otherwise("High")) \
        .withColumn("processing_date", expr("date(processing_timestamp)")) \
        .withColumn("processing_hour", hour(col("processing_timestamp")))
    
    return processed_df.filter(col("customer_id").isNotNull() & col("email").isNotNull())


## Gold Layer: Customer Analytics Aggregations

This section creates aggregated metrics from the processed customer streaming data:

- **Customer Demographics**: Age group and geographic distribution
- **Purchase Analytics**: Revenue metrics by category and tier
- **Real-time Metrics**: Hourly processing statistics


In [None]:
# Gold table: customer demographics by age group and location
@dlt.table(
    comment="Customer demographics aggregated by age group and location"
)
def customer_demographics_gold():
    df = dlt.read("customer_stream_silver")
    return (
        df.groupBy("age_group", "country", "city")
          .agg(
              count("*").alias("customer_count"),
              round(avg("age"), 1).alias("avg_age"),
              round(avg("purchase_amount"), 2).alias("avg_purchase_amount"),
              round(sum("purchase_amount"), 2).alias("total_revenue")
          )
          .orderBy("country", "city", "age_group")
    )


In [None]:
# Gold table: purchase analytics by product category and tier
@dlt.table(
    comment="Purchase analytics aggregated by product category and purchase tier"
)
def purchase_analytics_gold():
    df = dlt.read("customer_stream_silver")
    return (
        df.groupBy("product_category", "purchase_tier")
          .agg(
              count("*").alias("transaction_count"),
              round(avg("purchase_amount"), 2).alias("avg_purchase_amount"),
              round(sum("purchase_amount"), 2).alias("total_revenue"),
              countDistinct("customer_id").alias("unique_customers")
          )
          .orderBy("product_category", "purchase_tier")
    )


In [None]:
# Gold table: real-time processing metrics
@dlt.table(
    comment="Real-time processing metrics by hour and date"
)
def realtime_metrics_gold():
    df = dlt.read("customer_stream_silver")
    return (
        df.groupBy("processing_date", "processing_hour")
          .agg(
              count("*").alias("records_processed"),
              countDistinct("customer_id").alias("unique_customers"),
              countDistinct("country").alias("countries_count"),
              countDistinct("product_category").alias("categories_count"),
              round(sum("purchase_amount"), 2).alias("hourly_revenue"),
              round(avg("purchase_amount"), 2).alias("avg_purchase_amount")
          )
          .orderBy("processing_date", "processing_hour")
    )
