# Pipeline and Medallion Architecture

In this notebook we are going to combine the ideas from:
- [Example medallion architecture](https://docs.databricks.com/aws/en/lakehouse/medallion#example-medallion-architecture)
- [Load data with Lakeflow Declarative Pipelines
](https://docs.databricks.com/aws/en/ldp/load)
- [Manage data quality with pipeline expectations
](https://docs.databricks.com/aws/en/ldp/expectations)

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark import pipelines as dp

# Landing zone (directories where the raw data is)

- In this example we are using the fake generated data stored in the Managed Volume.
- In a real scenario, could be:
    - Cloud-object storage path: S3, ADLS, GCS
    - External Volume path: Access existing Clod-object storage using volume-like paths

In [None]:
volume = spark.conf.get("landing_zone_volume", "")
bronze_schema = spark.conf.get("bronze_schema", "")
silver_schema = spark.conf.get("silver_schema", "")
gold_schema = spark.conf.get("gold_schema", "")

customers_directory = f"{volume}/customers"
products_directory = f"{volume}/products"
transactions_directory = f"{volume}/transactions"

# Bronze layer

In the bronze layer we are supposed to only:

- Load the parquet raw data into tables
- Avoid transformations, changes and filters in the data.
- Keep the data in the original format.

### Customers table

In [None]:
@dp.table(name=f"{bronze_schema}.customers_raw")
def bronze_customers():
  """
    Returns a Spark Streaming Dataframe that uses AutoLoader to incrementally read the customer data
  """
  df = (
      spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "parquet")
          .load(customers_directory)
  )
  return df

### Products table

In [None]:
@dp.table(name=f"{bronze_schema}.products_raw")
def bronze_products():
  """
    Returns a Spark Streaming Dataframe that uses AutoLoader to incrementally read the orders data
  """
  df = (
      spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "parquet")
          .load(products_directory)
  )
  return df

### Transactions table

In [None]:
@dp.table(name=f"{bronze_schema}.transactions_raw")
def bronze_transactions():
  """
    Returns a Spark Streaming Dataframe that uses AutoLoader to incrementally read the orders data
  """
  df = (
      spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "parquet")
          .load(transactions_directory)
  )
  return df

## Silver layer

### Customers table with a data quality expectation

In [None]:
@dp.table(name=f"{silver_schema}.customers")
@dp.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
def silver_customers():
  # Read Bronze table
  df = spark.readStream.table(f"{bronze_schema}.customers_raw")

  # Drop the Auto Loader generated column, no longer needed on silver.
  df = df.drop("_rescued_data")
  return df

### Products table with a data quality expectation

In [None]:
@dp.table(name=f"{silver_schema}.products")
@dp.expect_or_drop("valid_product_id", "product_id IS NOT NULL")
def silver_products():
  # Read Bronze table
  df = spark.readStream.table(f"{bronze_schema}.products_raw")

  # Drop the Auto Loader generated column, no longer needed on silver.
  df = df.drop("_rescued_data")
  return df

### Transactions table with a data quality expectations

In [None]:
@dp.table(name=f"{silver_schema}.transactions")
@dp.expect_or_drop("valid_transaction_id", "transaction_id IS NOT NULL")
@dp.expect_or_drop("valid_product_id", "product_id IS NOT NULL")
@dp.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dp.expect_or_drop("valid_quantity", "quantity IS NOT NULL")
def silver_transactions():
  # Read Bronze table
  df = spark.readStream.table(f"{bronze_schema}.transactions_raw")

  # Drop the Auto Loader generated column, no longer needed on silver.
  df = df.drop("_rescued_data")
  return df

### Sales detail (join of 3 previous tables)

In [None]:
@dp.table(name=f"{silver_schema}.sales_detail")
@dp.expect_or_drop("valid_quantity", "quantity > 0")
@dp.expect_or_drop("valid_unit_price", "unit_price > 0")
@dp.expect_or_drop("valid_country", "country IS NOT NULL")
def silver_sales_detail():
    # Silver tables
    customers_df = spark.readStream.table(f"{silver_schema}.customers")
    products_df = spark.readStream.table(f"{silver_schema}.products")
    transactions_df = spark.readStream.table(f"{silver_schema}.transactions")

    silver_sales_detail_df = transactions_df \
    .join(customers_df, "customer_id", "inner") \
    .join(products_df, "product_id", "inner") \
    .select(
        F.col("transaction_id"),
        F.col("transaction_date"),
        F.year(F.col("transaction_date")).alias("transaction_year"),
        F.col("customer_id"),
        F.col("name").alias("customer_name"),
        F.col("email").alias("customer_email"),
        F.col("country"),
        F.col("customer_segment"),
        F.col("product_id"),
        F.col("product_name"),
        F.col("category").alias("product_category"),
        F.col("price").alias("unit_price"),
        F.col("cost").alias("unit_cost"),
        F.col("quantity"),
        # Calculated columns
        F.round(F.col("price") * F.col("quantity"), 2).alias("total_sales"),
        F.round(F.col("cost") * F.col("quantity"), 2).alias("total_cost"),
        F.round((F.col("price") - F.col("cost")) * F.col("quantity"), 2).alias("total_profit"),
        F.round(((F.col("price") - F.col("cost")) / F.col("price")) * 100, 2).alias("profit_margin_pct")
    )
    return silver_sales_detail_df

# Gold layer (business aggregates)

In [None]:
@dp.materialized_view(name=f"{gold_schema}.sales_by_country")
def gold_sales_by_country():

    sales_detail_df = spark.read.table(f"{silver_schema}.sales_detail")
    sales_by_country_df = sales_detail_df \
        .groupBy("country") \
        .agg(
            F.sum("total_sales").alias("total_sales"),
            F.sum("total_cost").alias("total_cost"),
            F.sum("total_profit").alias("total_revenue"),
            F.count("transaction_id").alias("transaction_count"),
            F.round(F.avg("total_sales"), 2).alias("avg_transaction_value"),
            F.count("customer_id").alias("customer_count")
        ) \
        .withColumn("total_sales", F.round(F.col("total_sales"), 2)) \
        .withColumn("total_cost", F.round(F.col("total_cost"), 2)) \
        .withColumn("total_revenue", F.round(F.col("total_revenue"), 2)) \
        .orderBy(F.col("total_revenue").desc())
    
    return sales_by_country_df

In [None]:
@dp.materialized_view(name=f"{gold_schema}.revenue_per_year")
def gold_revenue_per_year():

    sales_detail_df = spark.read.table(f"{silver_schema}.sales_detail")

    revenue_by_year_df = sales_detail_df \
    .groupBy("transaction_year") \
    .agg(
        F.sum("total_sales").alias("total_sales"),
        F.sum("total_cost").alias("total_cost"),
        F.sum("total_profit").alias("total_revenue"),
        F.count("transaction_id").alias("transaction_count"),
        F.count("customer_id").alias("unique_customers"),
        F.round(F.avg("total_sales"), 2).alias("avg_transaction_value")
    ) \
    .withColumn("total_sales", F.round(F.col("total_sales"), 2)) \
    .withColumn("total_cost", F.round(F.col("total_cost"), 2)) \
    .withColumn("total_revenue", F.round(F.col("total_revenue"), 2)) \
    .orderBy("transaction_year")

    return revenue_by_year_df

In [None]:
@dp.materialized_view(name=f"{gold_schema}.customer_segment_performance")
def gold_sales_per_country():

    sales_detail_df = spark.read.table(f"{silver_schema}.sales_detail")

    segment_performance_df = sales_detail_df \
    .groupBy("customer_segment") \
    .agg(
        F.sum("total_sales").alias("total_sales"),
        F.sum("total_profit").alias("total_revenue"),
        F.count("transaction_id").alias("transaction_count"),
        F.count("customer_id").alias("customer_count"),
        F.round(F.avg("profit_margin_pct"), 2).alias("avg_profit_margin_pct")
    ) \
    .withColumn("total_sales", F.round(F.col("total_sales"), 2)) \
    .withColumn("total_revenue", F.round(F.col("total_revenue"), 2)) \
    .withColumn("revenue_per_customer", F.round(F.col("total_revenue") / F.col("customer_count"), 2)) \
    .orderBy(F.col("total_revenue").desc())

    return segment_performance_df