# Setup

In [None]:
# we will have spark session available to us as a variable named spark
spark

# Creating tables with automatic schema evolution

## Bronze layer tables

In [None]:
spark.sql("DROP TABLE IF EXISTS prod.db.customer")
spark.sql("DROP TABLE IF EXISTS prod.db.profile")
spark.sql("DROP TABLE IF EXISTS prod.db.orders")

# Table DDL for OLTP tables
spark.sql("""
CREATE TABLE IF NOT EXISTS prod.db.customer (
    customer_id INT,
    email STRING,
    first_name STRING,
    last_name STRING,
    phone STRING,
    status STRING,
    is_verified BOOLEAN,
    registration_date TIMESTAMP,
    last_login_date TIMESTAMP,
    datetime_created TIMESTAMP,
    datetime_updated TIMESTAMP
) USING iceberg
TBLPROPERTIES (
    'format-version' = '2'
)""")


spark.sql("""
-- Profile table (child to customer - many profiles per customer)
CREATE TABLE IF NOT EXISTS prod.db.profile (
    profile_id INT,
    customer_id INT,
    profile_name STRING,
    profile_type STRING,
    address_line1 STRING,
    address_line2 STRING,
    city STRING,
    state STRING,
    postal_code STRING,
    country STRING,
    is_default BOOLEAN,
    datetime_created TIMESTAMP,
    datetime_updated TIMESTAMP
) USING iceberg
TBLPROPERTIES (
    'format-version' = '2'
);""")

spark.sql("""
-- Orders table (references profile)
CREATE TABLE IF NOT EXISTS prod.db.orders (
    order_id INT,
    customer_id INT,
    order_date TIMESTAMP,
    order_status STRING,
    payment_method STRING,
    payment_status STRING,
    subtotal DECIMAL(10, 2),
    tax_amount DECIMAL(10, 2),
    shipping_amount DECIMAL(10, 2),
    discount_amount DECIMAL(10, 2),
    total_amount DECIMAL(10, 2),
    currency STRING,
    shipping_method STRING,
    tracking_number STRING,
    notes STRING,
    datetime_created TIMESTAMP
) USING iceberg
TBLPROPERTIES (
    'format-version' = '2'
);
""") 

# Insert some fake data for the OLTP tables
spark.sql("""
-- Insert sample customers
INSERT INTO prod.db.customer VALUES
  (1, 'john.doe@example.com', 'John', 'Doe', '555-123-4567', 'active', true, TIMESTAMP '2023-01-15 08:30:00', TIMESTAMP '2023-03-20 14:22:15', TIMESTAMP '2023-01-15 08:30:00', TIMESTAMP '2023-03-20 14:22:15'),
  (2, 'jane.smith@example.com', 'Jane', 'Smith', '555-987-6543', 'active', true, TIMESTAMP '2023-02-05 12:45:00', TIMESTAMP '2023-03-18 09:10:30', TIMESTAMP '2023-02-05 12:45:00', TIMESTAMP '2023-03-18 09:10:30'),
  (3, 'robert.brown@example.com', 'Robert', 'Brown', '555-456-7890', 'inactive', false, TIMESTAMP '2023-01-25 15:20:00', TIMESTAMP '2023-02-10 11:05:45', TIMESTAMP '2023-01-25 15:20:00', TIMESTAMP '2023-02-10 11:05:45');
""")

spark.sql("""
-- Insert sample profiles
INSERT INTO prod.db.profile VALUES
  (101, 1, 'Home Address', 'home', '123 Main St', 'Apt 4B', 'New York', 'NY', '10001', 'US', true, TIMESTAMP '2023-01-15 08:35:00', TIMESTAMP '2023-01-15 08:35:00'),
  (102, 1, 'Work Address', 'work', '456 Business Ave', 'Suite 200', 'New York', 'NY', '10002', 'US', false, TIMESTAMP '2023-02-10 09:20:00', TIMESTAMP '2023-02-10 09:20:00'),
  (103, 2, 'Shipping Address', 'shipping', '789 Residential Blvd', NULL, 'Los Angeles', 'CA', '90001', 'US', true, TIMESTAMP '2023-02-05 12:50:00', TIMESTAMP '2023-02-05 12:50:00'),
  (104, 2, 'Billing Address', 'billing', '101 Finance St', '15th Floor', 'Los Angeles', 'CA', '90002', 'US', false, TIMESTAMP '2023-02-05 12:55:00', TIMESTAMP '2023-02-05 12:55:00'),
  (105, 3, 'Home Address', 'home', '202 Cedar Lane', NULL, 'Chicago', 'IL', '60601', 'US', true, TIMESTAMP '2023-01-25 15:25:00', TIMESTAMP '2023-01-25 15:25:00');
""")

spark.sql("""
-- Insert sample orders
INSERT INTO prod.db.orders VALUES
  (1001, 1, TIMESTAMP '2023-02-01 10:15:00', 'delivered', 'credit_card', 'paid', 89.99, 7.20, 5.99, 0.00, 103.18, 'USD', 'standard', 'TRK123456789', NULL, TIMESTAMP '2023-02-01 10:15:00'),
  (1002, 1, TIMESTAMP '2023-03-10 14:30:00', 'shipped', 'paypal', 'paid', 45.50, 3.64, 5.99, 5.00, 50.13, 'USD', 'express', 'TRK987654321', NULL, TIMESTAMP '2023-03-10 14:30:00'),
  (1003, 2, TIMESTAMP '2023-02-20 09:45:00', 'delivered', 'credit_card', 'paid', 129.95, 10.40, 9.99, 15.00, 135.34, 'USD', 'standard', 'TRK456789123', 'Please leave at front door', TIMESTAMP '2023-02-20 09:45:00'),
  (1004, 2, TIMESTAMP '2023-03-15 16:20:00', 'processing', 'apple_pay', 'paid', 75.25, 6.02, 5.99, 0.00, 87.26, 'USD', 'standard', NULL, NULL, TIMESTAMP '2023-03-15 16:20:00'),
  (1005, 3, TIMESTAMP '2023-02-05 11:10:00', 'cancelled', 'credit_card', 'refunded', 199.99, 16.00, 12.99, 20.00, 208.98, 'USD', 'overnight', NULL, 'Customer requested cancellation', TIMESTAMP '2023-02-05 11:10:00');
  """)

## Silver layer tables

In [None]:
spark.sql("DROP TABLE IF EXISTS prod.db.dim_customer")
spark.sql("DROP TABLE IF EXISTS prod.db.fct_orders")

spark.sql("""
-- Customer Dimension Table with profiles as array of structs
CREATE TABLE IF NOT EXISTS prod.db.dim_customer (
    customer_id INT,
    email STRING,
    first_name STRING,
    last_name STRING,
    phone STRING,
    status STRING,
    is_verified BOOLEAN,
    registration_date TIMESTAMP,
    last_login_date TIMESTAMP,
    datetime_created TIMESTAMP,
    datetime_updated TIMESTAMP,
    etl_upserted TIMESTAMP,
    
    -- Profiles as array of structs
    profiles ARRAY<STRUCT<
        profile_id: INT,
        profile_name: STRING,
        profile_type: STRING,
        address_line1: STRING,
        address_line2: STRING,
        city: STRING,
        state: STRING,
        postal_code: STRING,
        country: STRING,
        is_default: BOOLEAN,
        datetime_created: TIMESTAMP,
        datetime_updated: TIMESTAMP
    >>
) USING iceberg
PARTITIONED BY (datetime_updated)
TBLPROPERTIES (
    'format-version' = '2',
    'write.spark.accept-any-schema'='true'
);""")


spark.sql("""
-- Orders Fact Table
CREATE TABLE IF NOT EXISTS prod.db.fct_orders (
    order_id INT,
    
    -- Foreign keys
    customer_id INT,
    
    -- Time dimensions
    order_date TIMESTAMP,
    
    -- Order attributes
    order_status STRING,
    payment_method STRING,
    payment_status STRING,
    
    -- Order metrics
    subtotal DECIMAL(10, 2),
    tax_amount DECIMAL(10, 2),
    shipping_amount DECIMAL(10, 2),
    discount_amount DECIMAL(10, 2),
    total_amount DECIMAL(10, 2),
    
    -- Order details
    currency STRING,
    shipping_method STRING,
    tracking_number STRING,
    notes STRING

) USING iceberg
TBLPROPERTIES (
    'format-version' = '2',
    'write.spark.accept-any-schema'='true' 
);
""")

## Gold layer tables

In [None]:
spark.sql("DROP TABLE IF EXISTS prod.db.obt_orders")

# Order Business Table (OBT) - Denormalized view of orders with customer information
spark.sql(""" 

CREATE TABLE IF NOT EXISTS prod.db.obt_orders (
    -- Order primary key
    order_id INT,
    
    -- Order date and time attributes
    order_date TIMESTAMP,
    
    -- Order attributes
    order_status STRING,
    payment_method STRING,
    payment_status STRING,
    currency STRING,
    shipping_method STRING,
    tracking_number STRING,
    notes STRING,
    
    -- Order metrics
    subtotal DECIMAL(10, 2),
    tax_amount DECIMAL(10, 2),
    shipping_amount DECIMAL(10, 2),
    discount_amount DECIMAL(10, 2),
    total_amount DECIMAL(10, 2),
    
    -- Customer information
    
    customer STRUCT<
    customer_id: INT,
    email: STRING,
    first_name: STRING,
    last_name: STRING,
    phone: STRING,
    status: STRING,
    is_verified: BOOLEAN,
    registration_date: TIMESTAMP,
    last_login_date: TIMESTAMP,
    datetime_created TIMESTAMP,
    datetime_updated TIMESTAMP,
    etl_upserted TIMESTAMP,
    
    -- Profiles as array of structs
    profiles ARRAY<STRUCT<
        profile_id: INT,
        profile_name: STRING,
        profile_type: STRING,
        address_line1: STRING,
        address_line2: STRING,
        city: STRING,
        state: STRING,
        postal_code: STRING,
        country: STRING,
        is_default: BOOLEAN,
        datetime_created: TIMESTAMP,
        datetime_updated: TIMESTAMP
    >>
    >,
    
    -- ETL metadata
    datetime_created TIMESTAMP,
    etl_upserted TIMESTAMP
) USING iceberg
PARTITIONED BY (datetime_created)
TBLPROPERTIES (
    'format-version' = '2',
    'write.spark.accept-any-schema'='true' 
);
""")

# Data pipelines

## dim_customer silver table pipeline

In [None]:
from pyspark.sql import functions as F

class DimCustomer:
    def extract_upstream(self, start_ts, end_ts):
        print("Starting EXRACT...")
        customer_df = spark.table("prod.db.customer").where(F.col("datetime_updated").between(start_ts, end_ts))
        profile_df = spark.table("prod.db.profile").where(F.col("datetime_updated").between(start_ts, end_ts))
        return {
            "customer": customer_df,
            "profile": profile_df
        }
    
    def transform(self, input_dfs):
        print("Starting TRANSFORM...")
        customer_df = input_dfs.get("customer")
        profile_df = input_dfs.get("profile")
        profile_struct_columns = [c for c in profile_df.columns if 'customer_id' not in c]

        customer_cols = customer_df.columns

        grouped_profile_id = profile_df.groupBy(
            F.col("customer_id")
        ).agg(
            F.collect_list(
                F.struct(profile_struct_columns)
            )
        .alias("profiles"))
        
        return customer_df.join(
            grouped_profile_id,
            on="customer_id"
        ).select(
            *customer_cols
            ,grouped_profile_id['profiles']
        )
    
    def load(self, transformed_df):
        print("Starting LOAD...")
        transformed_df = transformed_df.withColumn("etl_upserted", F.current_timestamp())

        transformed_df\
        .writeTo("prod.db.dim_customer")\
        .option("check-ordering", "false")\
        .option("mergeSchema", "true")\
        .overwritePartitions()
    
    def run(self, start_ts, end_ts):
        print("Starting RUN...")
        # Log: run_time, start_ts, end_ts, START state
        self.load(self.transform(self.extract_upstream(start_ts, end_ts)))
        # Log: run_time, start_ts, end_ts, END state

In [None]:
# Run the pipeline
dim_customer = DimCustomer()
dim_customer.run('2022-01-01', '2024-01-01')

## obt_orders gold table pipeline 

In [None]:
class OBTOrders:
    def extract_upstream(self, start_ts, end_ts):
        print("Starting EXTRACT...")
        fct_orders = spark.table("prod.db.orders").where(F.col("datetime_created").between(start_ts, end_ts))
        dim_customer = spark.table("prod.db.dim_customer").where(F.col("datetime_updated").between(start_ts, end_ts))
        return {
            "fct_orders": fct_orders,
            "dim_customer": dim_customer
        }
    
    def transform(self, input_dfs):
        print("Starting TRANSFORM...")
        fct_orders = input_dfs.get("fct_orders")
        dim_customer = input_dfs.get("dim_customer")

        order_cols = fct_orders.columns
        dim_customer_cols = dim_customer.columns
        
        return fct_orders.join(
            dim_customer,
            on="customer_id"
        ).select(
            *[fct_orders[order_col] for order_col in order_cols]
            , F.struct(
                *[dim_customer[dim_customer_col] for dim_customer_col in dim_customer_cols]
            ).alias("customer")
        ).drop("customer_id")
    
    def load(self, transformed_df):
        print("Starting LOAD...")
        transformed_df = transformed_df.withColumn("etl_upserted", F.current_timestamp())
        transformed_df\
        .writeTo("prod.db.obt_orders")\
        .option("check-ordering", "false")\
        .option("mergeSchema", "true")\
        .overwritePartitions()
    
    def run(self, start_ts, end_ts):
        print("Starting RUN...")
        # Log: run_time, start_ts, end_ts, START state
        self.load(self.transform(self.extract_upstream(start_ts, end_ts)))
        # Log: run_time, start_ts, end_ts, END state

In [None]:
obt_orders = OBTOrders()
obt_orders.run('2022-01-01', '2024-01-01')

In [None]:
# Inspect the schema of obt_orders
spark.table("prod.db.obt_orders").printSchema()

## Schema evolution

In [None]:
# Alter upstream customer and profile schemas
# Here we assume that the bronze layers pull directly from the upstream table
spark.sql("Alter table prod.db.customer add column is_fraud boolean")
spark.sql("Alter table prod.db.profile add column is_under_18 boolean")

In [None]:
# re-run the full pipeline
# re-run the silver table
dim_customer.run('2022-01-01', '2024-01-01')
# re-run the gold table
obt_orders.run('2022-01-01', '2024-01-01')

In [None]:
# check the obt_orders schema
spark.table("prod.db.obt_orders").printSchema()

We can see that we have an `is_under_18` column added to the profile STRUCT and the `is_fraud` column added to the customer STRUCT.

Our schema evolution works!

![schema_evol.png](attachment:3a1f1438-f403-4785-9077-3a3415c541b4.png)

In [None]:
spark.table("prod.db.obt_orders").select("customer.profiles").show(truncate=False)
# We can see that the last field in the profile STRUCT `is_under_18` is NULL since we did not insert any data into the bronze tables

# Split ARRAY elements into individual rows with EXPLODE

Let's say we get a request to calculate the average cost of an order per profile, how would you do it?

In [None]:
# access customer STRUCT fields
spark.sql("select order_id, order_date, customer.customer_id, customer.first_name from prod.db.obt_orders").show(truncate=False)

In [None]:
# access profiles ARRAY[STRUCT], similar to other fields in the STRUCT
spark.sql("select order_id, order_date, customer.customer_id, customer.first_name, customer.profiles from prod.db.obt_orders").show(truncate=False)

In [None]:
# Let's split up each profile STRUCT into a single row
# In spark SQL use LATERAL VIEW + EXPLODE
spark.sql("""
select order_id
, order_date
, customer.customer_id
, customer.first_name
, profile_struct.profile_id
, profile_struct.profile_name
, profile_struct.profile_type
from prod.db.obt_orders 
LATERAL VIEW EXPLODE(customer.profiles) AS profile_struct 
""").show(truncate=False)

In [None]:
# Let's split up each profile STRUCT into a single row
# In Pyspark use Explode function
obt_orders_df = spark.table("prod.db.obt_orders")
obt_orders_df.select(
    F.col("order_id"),
    F.col("order_date"),
    F.col("customer.customer_id"),
    F.col("customer.first_name"),
    F.explode(
        F.col("customer.profiles")
    ).alias("profile")
).select(
    F.col("order_id"),
    F.col("order_date"),
    F.col("customer_id"),
    F.col("first_name"),
    F.col("profile.profile_id"),
    F.col("profile.profile_name"),
    F.col("profile.profile_type"),
).show(truncate=False)

## Incorrect aggregations

In [None]:
spark.sql("""
select order_id
, order_date
, total_amount
, customer.customer_id
, customer.first_name
, profile_struct.profile_id
, profile_struct.profile_name
, profile_struct.profile_type
from prod.db.obt_orders 
LATERAL VIEW EXPLODE(customer.profiles) AS profile_struct 
""").show(truncate=False)

In [None]:
spark.sql("""
With order_details as (
select order_id
, order_date
, total_amount
, customer.customer_id
, customer.first_name
, profile_struct.profile_id
, profile_struct.profile_name
, profile_struct.profile_type
from prod.db.obt_orders 
LATERAL VIEW EXPLODE(customer.profiles) AS profile_struct )
SELECT date(order_date) as order_date
, sum(total_amount) as total_order_cost
FROM order_details
GROUP BY 1
""").show(truncate=False)

From the above 2 examples we can see that the order's cost is duplicated when we aggregate. We need to be mindful of these situations.

![agg.png](attachment:ca1810d2-f47f-46a2-8a31-0ddc2977f9f9.png)