In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# SETUP

In [None]:
current_role = session.sql('select current_role()').collect()[0][0]
if current_role == 'ACCOUNTADMIN':
    current_role = 'ROLE100'

role_num = current_role.split('ROLE')[1]

current_db = session.sql('select current_database()').collect()[0][0]
current_schema = session.sql('select current_schema()').collect()[0][0]
current_wh = session.sql('select current_warehouse()').collect()[0][0]

# 1. Overview
Schema changes in upstream tables, like new fields from sources like Salesforce, must be carefully propagated to all downstream tables. Data Engineers manually tracking and updating each affected table is time-consuming and error-prone, often leading to broken pipelines and data inconsistencies. This guide leverages AI-powered automation to streamline schema propagation, ensuring accurate and consistent updates across data lineage, minimizing errors, and saving valuable time for data engineers.

## Prerequisites
Familiarity with SQL and Snowflake
Basic knowledge of data lineage concepts
What You'll Build
A powerful solution for automated schema propagation that combines Snowflake's processing capabilities with the intelligence of LLMs, enabling end-to-end lineage management. By the end of this guide, you'll have a framework that detects schema changes in upstream tables and ensures their propagation downstream with AI precision.

## What You'll Need
A Snowflake Account
Streamlit in Snowflake

## What You'll Learn
How to create an LLM-powered lineage manager that autonomously handles schema evolution across downstream tables
Leveraging LLMs to automatically assess and adapt DDL changes, simplifying complex workflows
Using Snowflake tasks for continuous schema monitoring
Building an intuitive UI with Streamlit for managing schema propagation

# 2. Application Architecture
This solution leverages the intelligence of LLMs to dynamically analyze and respond to schema changes, creating a seamless flow of information across your data pipeline. The architecture of this application includes:

1. **Upstream and Downstream Tables in Snowflake**: A series of tables that form a lineage, where updates in upstream tables must cascade downstream.
2. **LLM-Powered Schema Analysis**: The core engine uses LLMs to generate and apply appropriate DDL (Data Definition Language) modifications, ensuring the consistency of schema changes across all affected tables.
3. **Schema Change Monitor**: A Snowflake task that continuously tracks schema alterations in upstream tables and logs any detected changes.
4. **Streamlit-based UI**: Provides an accessible, intuitive interface to monitor and manage schema propagation, showcasing LLM-suggested DDL changes for review and approval.
5. **Automated Propagation Workflow**: Utilizes a directed graph traversal algorithm (DFS) to ensure the orderly propagation of changes across the entire data pipeline.

## Architecture Diagram

[Click here!](https://quickstarts.snowflake.com/guide/schema_lineage_auto_propagation_llm/index.html?index=..%2F..index#1)

# 3. Initial (Dynamic) Tables
This section includes the setup of the initial tables, which represent different levels in the data lineage (Bronze, Silver, Gold, etc.) using Dynamic Tables. We also create a baseline schema to detect future changes.

The example below has this medallion structure:

Bronze -> Silver -> Gold -> Platinum -> Gold2

In [None]:
CREATE OR REPLACE TABLE bronze_salesforce_customers (
    customer_id NUMBER(38,0),
    customer_name VARCHAR(16777216),
    customer_region VARCHAR(16777216),
    created_date DATE    
);

INSERT INTO bronze_salesforce_customers (customer_id, customer_name, customer_region, created_date)
VALUES
    (101, 'Acme Corp', 'North', '2022-01-01'),
    (102, 'Beta LLC', 'South', '2021-06-15'),
    (103, 'Gamma Inc', 'East', '2023-03-20'),
    (104, 'Delta Co', 'West', '2022-09-10'),
    (105, 'Epsilon Ltd', 'North', '2021-12-30');

In [None]:
session.sql(f"""
CREATE OR REPLACE DYNAMIC TABLE silver_customers
WAREHOUSE = {current_wh}
LAG = '1 day'
AS
WITH base AS (
    SELECT
        customer_id,
        customer_name,
        customer_region,
        created_date
    FROM bronze_salesforce_customers
)
SELECT
    customer_id,
    customer_name,
    UPPER(customer_region) AS region_standardized,  -- Standardize region to uppercase
    DATEDIFF('day', created_date, CURRENT_DATE) AS account_age_days  -- Calculate account age in days
FROM base
WHERE created_date IS NOT NULL;
""").collect()

In [None]:
session.sql(f"""
CREATE OR REPLACE DYNAMIC TABLE gold_customer_analytics
WAREHOUSE = {current_wh}
LAG = '1 day'
AS
WITH base AS (
    SELECT
        customer_id,
        customer_name,
        region_standardized,
        account_age_days
    FROM silver_customers
)
SELECT
    customer_id,
    customer_name,
    region_standardized,
    account_age_days,
    
    -- Derived field for customer lifetime category
    CASE 
        WHEN account_age_days < 365 THEN 'New'
        WHEN account_age_days BETWEEN 365 AND 730 THEN 'Loyal'
        ELSE 'Long-Term'
    END AS customer_lifetime_category  -- New derived column for analytics
FROM base;
""").collect()

In [None]:
session.sql(f"""
CREATE OR REPLACE DYNAMIC TABLE gold_customer_analytics_v2
WAREHOUSE = {current_wh}
LAG = '1 day'
AS
WITH base AS (
    SELECT
        customer_id,
        customer_name,
        region_standardized,
        account_age_days
    FROM silver_customers
)
SELECT
    customer_id,
    customer_name,
    region_standardized,
    account_age_days,
    
    -- New field for customer engagement level based on account age
    CASE 
        WHEN account_age_days < 180 THEN 'Newly Engaged'
        WHEN account_age_days BETWEEN 180 AND 540 THEN 'Moderately Engaged'
        ELSE 'Highly Engaged'
    END AS customer_engagement_level,
    
    -- Simple classification based on region for geographic grouping
    CASE
        WHEN region_standardized = 'NORTH' OR region_standardized = 'SOUTH' THEN 'Domestic'
        WHEN region_standardized = 'EAST' OR region_standardized = 'WEST' THEN 'International'
        ELSE 'Unknown'
    END AS geographic_category

FROM base;
""").collect()

In [None]:
session.sql(f"""
CREATE OR REPLACE DYNAMIC TABLE platinum_customer_insights
WAREHOUSE = {current_wh}
LAG = '1 day'
AS
WITH base AS (
    SELECT
        customer_id,
        customer_name,
        region_standardized,
        account_age_days,
        customer_lifetime_category
    FROM gold_customer_analytics
)
SELECT
    customer_id,
    customer_name,
    region_standardized,
    account_age_days,
    customer_lifetime_category,
    
    -- Derived field to indicate customer loyalty status
    CASE 
        WHEN customer_lifetime_category = 'Long-Term' THEN 'High Loyalty'
        WHEN customer_lifetime_category = 'Loyal' THEN 'Medium Loyalty'
        ELSE 'Low Loyalty'
    END AS customer_loyalty_status,
    
    -- Segmenting customers based on region for targeted marketing insights
    CASE 
        WHEN region_standardized = 'NORTH' THEN 'Northern Market'
        WHEN region_standardized = 'SOUTH' THEN 'Southern Market'
        WHEN region_standardized = 'EAST' THEN 'Eastern Market'
        WHEN region_standardized = 'WEST' THEN 'Western Market'
        ELSE 'Other Market'
    END AS market_segment,
    
    -- Assign a discount eligibility flag based on customer lifetime category and loyalty
    CASE 
        WHEN customer_lifetime_category = 'Long-Term' OR customer_loyalty_status = 'High Loyalty' THEN 'Eligible'
        ELSE 'Not Eligible'
    END AS discount_eligibility_flag,

    -- Customer engagement level based on age and loyalty, for example purposes
    CASE
        WHEN account_age_days > 730 AND customer_loyalty_status = 'High Loyalty' THEN 'Highly Engaged'
        WHEN account_age_days BETWEEN 365 AND 730 THEN 'Moderately Engaged'
        ELSE 'Newly Engaged'
    END AS engagement_level

FROM base;
""").collect()

# 4. Monitoring Task
In this part, you'll create a Snowflake task to continuously monitor schema changes in upstream tables. When a change is detected, the task logs it in a schema change log and updates the schema baseline for consistency.

In [None]:
CREATE OR REPLACE TABLE schema_baseline (  
    table_name STRING,
    column_name STRING,
    data_type STRING
);

CREATE OR REPLACE TABLE schema_change_log (
    change_detected_at TIMESTAMP,
    table_name STRING,
    column_name STRING,
    data_type STRING,
    change_type STRING
);

In [None]:
session.sql(f"""
CREATE OR REPLACE TASK schema_change_monitor
WAREHOUSE = {current_wh}
SCHEDULE = 'USING CRON * * * * * UTC'  -- Runs every minute
AS
BEGIN
    INSERT INTO schema_change_log (change_detected_at, table_name, column_name, data_type, change_type)
    SELECT 
        CURRENT_TIMESTAMP AS change_detected_at,
        table_name,
        column_name,
        data_type,
        'ADDED' AS change_type
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE table_schema = '{current_schema}'
      AND table_catalog = '{current_db}'
      AND column_name NOT IN (SELECT column_name FROM schema_baseline)
      AND table_name != 'SCHEMA_CHANGE_LOG'
    ORDER BY TABLE_NAME;

    INSERT INTO schema_baseline (table_name, column_name, data_type)
    SELECT table_name, column_name, data_type
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE table_schema = '{current_schema}'
      AND table_catalog = '{current_db}'
      AND (table_name, column_name) NOT IN (SELECT table_name, column_name FROM schema_baseline)
    ORDER BY TABLE_NAME;
END;
""").collect()

session.sql('ALTER TASK schema_change_monitor RESUME;').collect()

# 5. Upstream Schema Changes
In this part, simulate a schema change in the upstream table by adding a new column. This change will automatically be detected and logged, triggering downstream updates.

Currently, the unaltered upstream table looks like this:

![Current Bronze Customers Table](https://quickstarts.snowflake.com/guide/schema_lineage_auto_propagation_llm/img/99a3ddd62fde6af.png)

In [None]:
select * from bronze_salesforce_customers;

In [None]:
-- Alter table to add a new column
ALTER TABLE bronze_salesforce_customers 
ADD COLUMN customer_segment VARCHAR(16777216);

-- Set initial values for existing records
UPDATE bronze_salesforce_customers
SET customer_segment = CASE 
    WHEN customer_region = 'North' THEN 'Premium'
    WHEN customer_region = 'South' THEN 'Basic'
    WHEN customer_region = 'East' THEN 'Standard'
    WHEN customer_region = 'West' THEN 'Enterprise'
    ELSE 'Unknown'
END;

In [None]:
select * from bronze_salesforce_customers;

# 6. Snowflake in Streamlit Code
With the groundwork laid, you're ready to integrate the Streamlit-based UI for hands-on control of schema propagation. This UI, backed by LLMs, allows you to manage schema changes in a user-friendly environment. Users will be able to visualize lineage, apply or preview LLM-suggested DDL changes, and monitor the entire propagation process.

