In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_date, to_date, lit
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

25/08/14 07:46:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Tables

In [3]:
billing_events

DataFrame[subscription_id: string, event_id: string, event_type: string, amount: double, event_date: date]

In [7]:
%%sql

CREATE TABLE IF NOT EXISTS saas.raw_billing_events (
    subscription_id STRING,
    event_id STRING,
    event_type STRING,
    amount DOUBLE,
    event_date DATE,
    ds DATE
) 
USING iceberg
PARTITIONED BY (ds);

DataFrame[subscription_id: string, event_id: string, event_type: string, amount: double, event_date: date]

In [9]:
subscriptions

DataFrame[subscription_id: string, customer_id: string, end_date: date, plan_id: string]

In [None]:
%%sql

CREATE TABLE IF NOT EXISTS saas.raw_subscriptions (
    subscription_id STRING,
    customer_id STRING,
    plan_id STRING,
    end_date DATE,
    ds DATE
) 
USING iceberg
PARTITIONED BY (ds);

In [10]:
customers

DataFrame[customer_id: string, name: string, status: string, signup_date: date, country: string]

In [None]:
%%sql

CREATE TABLE IF NOT EXISTS saas.raw_customers (
    customer_id STRING,
    name STRING,
    status STRING,
    country STRING,
    signup_date DATE,
    ds DATE
) 
PARTITIONED BY (ds);

In [None]:
%%sql

CREATE TABLE IF NOT EXISTS saas.stg_billing_events (
    subscription_id STRING,
    event_id STRING,
    event_type STRING,
    amount DOUBLE,
    event_date DATE,
    ds DATE
) 
USING iceberg
PARTITIONED BY (event_date, bucket(16, subscription_id));

In [None]:
%%sql

CREATE TABLE IF NOT EXISTS saas.stg_subscriptions (
    subscription_id STRING,
    customer_id STRING,
    plan_id STRING,
    end_date DATE,
    ds DATE
) 
USING iceberg
PARTITIONED BY (bucket(16, subscription_id));

In [None]:
%%sql

CREATE TABLE IF NOT EXISTS saas.dim_customers_scd2 (
    customer_id STRING,
    name STRING,
    status STRING,
    country STRING,
    signup_date DATE,
    start_date DATE,
    end_date DATE,
    is_current BOOLEAN,
    ds DATE
) 
USING iceberg
PARTITIONED BY (is_current);

In [None]:
%%sql

CREATE TABLE IF NOT EXISTS saas.fct_billing_events (
    customer_id STRING,
    dim_name STRING,
    dim_customer_status STRING,
    dim_signup_date DATE,
    dim_subscription_end_date DATE,
    subscription_id STRING,
    event_id STRING,
    dim_event_type STRING,
    m_amount DOUBLE,
    plan_id STRING,
    dim_event_date DATE,
    dim_country STRING,
    month_start DATE,
    ds DATE
) 
USING iceberg
PARTITIONED BY (dim_event_date);

# Pipeline code

In [None]:
billing_events = spark.read.format("csv").option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/billing_events.csv").withColumn("event_date", expr("CAST(DATE_TRUNC('day', event_date) AS DATE)")).select("subscription_id", "event_id", "event_type", "amount", "event_date")
subscriptions = spark.read.format("csv").option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/subscriptions.csv").select("subscription_id", "customer_id", "end_date", "plan_id")
customers = spark.read.format("csv").option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/customers.csv").select("customer_id", "name", "status", "signup_date", "country")

In [None]:
billing_events.writeTo("saas.raw_billing_events").append()

In [None]:
subscriptions.writeTo("saas.raw_subscriptions").append()

In [None]:
customers.writeTo("saas.raw_customers").append()

In [None]:
%%sql
    
INSERT OVERWRITE saas.stg_billing_events
WITH rn AS (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_date DESC) as row_num
    FROM saas.raw_billing_events 
    WHERE event_type = 'payment_succeeded'
)
SELECT * FROM rn WHERE row_num = 1;

In [None]:
%%sql

INSERT OVERWRITE saas.stg_subscriptions
WITH cleaned_and_validated_subscriptions AS (
    SELECT * 
    FROM subscriptions
    WHERE subscription_id IS NOT NULL AND customer_id IS NOT NULL AND plan_id IS NOT NULL
)
SELECT * 
FROM cleaned_and_validated_subscriptions;

In [None]:
%%sql

INSERT OVERWRITE saas.stg_subscriptions
WITH cleaned_and_validated_subscriptions AS (
    SELECT * 
    FROM subscriptions
    WHERE subscription_id IS NOT NULL AND customer_id IS NOT NULL AND plan_id IS NOT NULL
)
SELECT * 
FROM cleaned_and_validated_subscriptions;