# ‚ùÑÔ∏è Cloud Data Ingestion into Snowflake

**PyData Boston 2025 - From Notebook to Pipeline: Hands-On Data Engineering with Python**

In this notebook, you'll build a complete data pipeline that:
- üì• Ingests ~1 billion rows from CSV files in AWS S3
- üîÑ Transforms data using Snowflake Dynamic Tables
- üìä Creates aggregated business metrics with automatic incremental refresh
- ü§ñ Optionally enables AI-powered insights via Snowflake Intelligence

## üöß Setup Zone

Choose your preferred environment setup below:

| Option | Best For | Description |
|--------|----------|-------------|
| **Option A** | Intermediate/Advanced | Local development with virtual environments |
| **Option B** | Beginners | Run directly in Snowflake's hosted notebooks |

### üñ•Ô∏è Option A: Local Development Setup

**Recommended for:** Users comfortable with terminal, Python environments, and package management.

#### Step 1: Clone the Repository
```bash
git clone https://github.com/Snowflake-Labs/pydata_boston_2025_notebook_to_pipeline.git
cd pydata_boston_2025_notebook_to_pipeline
```

#### Step 2: Create Virtual Environment
```bash
python3 -m venv pydataboston_2025

# Activate (macOS/Linux)
source pydataboston_2025/bin/activate

# Activate (Windows)
pydataboston_2025\Scripts\activate
```

#### Step 3: Install Dependencies
```bash
pip install --upgrade pip
pip install -r requirements.txt
```

#### Step 4: Verify Installation
```python
python3 -c "from snowflake.snowpark import Session; from snowflake.core import Root; print('‚úÖ Installation successful!')"
```

### ‚òÅÔ∏è Option B: Snowflake Notebooks Setup

**Recommended for:** Beginners or those who prefer a managed environment.

1. **Log in** to your [Snowflake account](https://app.snowflake.com/)
2. **Navigate** to **Projects** ‚Üí **Notebooks**
3. **Import notebook:**
   - Click **+ Notebook** dropdown ‚Üí **Import .ipynb file**
   - Name: `cloud_data_pipeline`
   - Database: `SNOWFLAKE_LEARNING_DB`
   - Schema: `PUBLIC`
   - Runtime: `Run on warehouse`
4. **Install packages** via the **Packages** dropdown:
   - Search and add: `snowflake`
   - Search and add: `snowflake-snowpark-python`
5. **Skip to the imports cell** (Cell 9)

---
## ‚úÖ End of Setup Zone
---

## üì¶ Import Libraries

We'll use two main Snowflake packages:

| Package | Purpose |
|---------|---------|
| `snowflake.core` | Python API for managing Snowflake objects (databases, schemas, tables) |
| `snowflake.snowpark` | DataFrame API for data transformations using Snowflake's compute engine |

In [1]:
from snowflake.snowpark import Session # For Snowflake compute engine connection
from snowflake.core import Root, CreateMode # For core operations
from snowflake.core.database import Database # For database operations
from snowflake.core.schema import Schema # For schema operations
from snowflake.core.warehouse import Warehouse # For compute engine operations
from snowflake.core.table import Table, TableColumn, TableCollection # For table operations
from snowflake.core.role import Role, Securable # For role operations
from snowflake.core.stage import Stage, StageDirectoryTable # For stage operations
from snowflake.core.dynamic_table import DynamicTable, DownstreamLag, UserDefinedLag, DynamicTableCollection # For dynamic table operations
from snowflake.snowpark.types import * # For defining DataFrame schema
from snowflake.snowpark import functions as F # For DataFrame functions

## üîå Connect to Snowflake

We'll connect using Snowflake's compute engine for distributed processing‚Äîno local memory limitations!

**Connection Methods Available:**
1. ‚úÖ Connection parameters dictionary *(used in this tutorial)*
2. Snowflake CLI configuration file
3. Environment variables or external authentication

> ‚ö†Ô∏è **Security Note:** Never commit credentials to version control in production. Use environment variables or secret managers instead.

![Snowflake Login](./img/snowflake_login.png)

### **OPTION A ONLY**

In [2]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Build connection parameters from environment variables
connection_parameters = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "user": os.getenv("SNOWFLAKE_USER"),
    "password": os.getenv("SNOWFLAKE_PASSWORD"),
    "role": os.getenv("SNOWFLAKE_ROLE"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE")
}

# Create a session
session = Session.builder.configs(connection_parameters).create()
print(f"‚ùÑÔ∏è Connected to Snowflake account: {session.get_current_account()}")
print(f"Current role: {session.get_current_role()}")

‚ùÑÔ∏è Connected to Snowflake account: "OWVFCQY-OUB97142"
Current role: "ACCOUNTADMIN"


### **OPTION B ONLY**

In [None]:
# from snowflake.snowpark.context import get_active_session
# session = get_active_session()

# print(f"‚ùÑÔ∏è Connected to Snowflake account: {session.get_current_account()}")
# print(f"Current role: {session.get_current_role()}")

### Initialize Root API Object

The `Root` object is the entry point for Snowflake's Python API‚Äîit provides access to all database objects and operations.

In [3]:
# Create root object from Root():
root = Root(session)
print("Root API object created successfully")

Root API object created successfully


### üîê Create Lab Role (RBAC)

We'll create a dedicated role following the **principle of least privilege**‚Äîusers should have only the minimum permissions required for their tasks.

> **RBAC (Role-Based Access Control)** is a security best practice in data platforms.

In [4]:
# Create lab role using Python API
pydata_lab_role = Role(name="pydata_lab_role")
root.roles.create(pydata_lab_role, mode=CreateMode.if_not_exists)
print("Role 'pydata_lab_role' created successfully")

# Grant role to SYSADMIN (SQL required for role grants)
session.sql("GRANT ROLE pydata_lab_role TO ROLE SYSADMIN").collect()

# Grant necessary privileges to role (SQL required for account-level grants)
session.sql("GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE pydata_lab_role").collect()
session.sql("GRANT CREATE DATABASE ON ACCOUNT TO ROLE pydata_lab_role").collect()

print("Privileges granted to pydata_lab_role")

Role 'pydata_lab_role' created successfully
Privileges granted to pydata_lab_role


### Switch to Lab Role

Assume the `pydata_lab_role` for all subsequent operations.

In [5]:
session.use_role("pydata_lab_role")
print(f"Switched to role: {session.get_current_role()}")

Switched to role: "PYDATA_LAB_ROLE"


---
# Part 1: Define Data Objects üìê

In this section, we'll create the foundational infrastructure:
- Database and schemas
- Compute warehouse
- External stage (S3 connection)
- Raw data tables

### üóÑÔ∏è Create Database

Create `tasty_bytes_db` to contain all Tasty Bytes sales data.

In [7]:
# Create database
database_name = "tasty_bytes_db"
new_database = Database(name=database_name)
root.databases.create(new_database, mode=CreateMode.or_replace)

print(f"Database '{database_name}' created successfully")

# Set database context
session.use_database(database_name)
print(f"Session is now using database: {session.get_current_database()}")

Database 'tasty_bytes_db' created successfully
Session is now using database: "TASTY_BYTES_DB"


### üìÅ Create Schemas

Schemas logically group related data. We'll create two:

| Schema | Purpose |
|--------|---------|
| `raw` | Source data from CSV files |
| `analytics` | Transformed, business-ready data |

> üí° **Best Practice:** Separating raw and transformed data is a common pattern in data engineering.

![Schemas Creation](./img/schemas_creation.png)

In [8]:
# Get database reference
db = root.databases[database_name]

# Create raw schema
raw_schema = Schema(name="raw")
db.schemas.create(raw_schema, mode=CreateMode.or_replace)
print("Schema 'raw' created successfully")

# Create analytics schema using Python API:
analytics_schema = Schema(name="analytics")
db.schemas.create(analytics_schema, mode=CreateMode.or_replace)
print("Schema 'analytics' created successfully")

Schema 'raw' created successfully
Schema 'analytics' created successfully


### ‚ö° Create Compute Warehouse

A virtual warehouse provides compute resources for query execution. We'll create an **X-Large** warehouse with auto-suspend for cost optimization.

![Virtual Warehouse](./img/virtual_wh.png)

In [9]:
# Create compute resource
virtual_warehouse_name = "pydata_lab_wh"

new_warehouse = Warehouse(
    name=virtual_warehouse_name,
    warehouse_size="XLARGE"
)

root.warehouses.create(new_warehouse, mode=CreateMode.or_replace)
print(f"Warehouse '{virtual_warehouse_name}' created successfully")

# Grant usage on warehouse to pydata_lab_role
root.roles["pydata_lab_role"].grant_privileges(
    privileges=["USAGE"],
    securable_type="WAREHOUSE",
    securable=Securable(name=virtual_warehouse_name)
)
print(f"Granted USAGE on warehouse to pydata_lab_role")

# Use the warehouse
session.use_warehouse(virtual_warehouse_name)
print(f"Using virtual warehouse: {session.get_current_warehouse()}")

Warehouse 'pydata_lab_wh' created successfully
Granted USAGE on warehouse to pydata_lab_role
Using virtual warehouse: "PYDATA_LAB_WH"


### üì§ Create External Stage

Stages define locations where data files are stored. We'll create an external stage pointing to a public AWS S3 bucket containing our raw CSV data.

> üí° **Common Pattern:** Land raw data files in cloud object storage (S3, Azure Blob, GCS), then ingest into your data platform.

![Stages](./img/stages.png)

In [10]:
# Create external stage
tasty_bytes_stage = Stage(
    name="tasty_bytes_stage",
    url="s3://sfquickstarts/tasty-bytes-builder-education/",
    directory_table=StageDirectoryTable(enable=True)
)

root.databases[database_name].schemas["raw"].stages.create(
    tasty_bytes_stage,
    mode=CreateMode.or_replace
)
print("External stage 'tasty_bytes_stage' created successfully")

External stage 'tasty_bytes_stage' created successfully


### üìä Create Raw Tables

Define tables to hold our raw data (~1 billion rows total):

| Table | Description | Row Count |
|-------|-------------|-----------|
| `order_header` | Order-level information | ~248M |
| `order_detail` | Line item information | ~618M |
| `menu` | Product catalog | ~287 |

In [11]:
# Get schema reference
raw_schema_ref = db.schemas["raw"]

# Define order_header table structure
order_header_columns = [
    TableColumn(name="order_id", datatype="NUMBER(38,0)"),
    TableColumn(name="truck_id", datatype="NUMBER(38,0)"),
    TableColumn(name="location_id", datatype="FLOAT"),
    TableColumn(name="customer_id", datatype="NUMBER(38,0)"),
    TableColumn(name="discount_id", datatype="VARCHAR(16777216)"),
    TableColumn(name="shift_id", datatype="NUMBER(38,0)"),
    TableColumn(name="shift_start_time", datatype="TIME(9)"),
    TableColumn(name="shift_end_time", datatype="TIME(9)"),
    TableColumn(name="order_channel", datatype="VARCHAR(16777216)"),
    TableColumn(name="order_ts", datatype="TIMESTAMP_NTZ(9)"),
    TableColumn(name="served_ts", datatype="VARCHAR(16777216)"),
    TableColumn(name="order_currency", datatype="VARCHAR(3)"),
    TableColumn(name="order_amount", datatype="NUMBER(38,4)"),
    TableColumn(name="order_tax_amount", datatype="VARCHAR(16777216)"),
    TableColumn(name="order_discount_amount", datatype="VARCHAR(16777216)"),
    TableColumn(name="order_total", datatype="NUMBER(38,4)")
]

# Create order_header table using Python API
order_header_table = Table(
    name="order_header",
    columns=order_header_columns
)

raw_schema_ref.tables.create(order_header_table, mode=CreateMode.or_replace)
print("Table 'order_header' created successfully")

Table 'order_header' created successfully


In [12]:
# Define order_detail table structure
order_detail_columns = [
    TableColumn(name="order_detail_id", datatype="NUMBER(38,0)"),
    TableColumn(name="order_id", datatype="NUMBER(38,0)"),
    TableColumn(name="menu_item_id", datatype="NUMBER(38,0)"),
    TableColumn(name="discount_id", datatype="VARCHAR(16777216)"),
    TableColumn(name="line_number", datatype="NUMBER(38,0)"),
    TableColumn(name="quantity", datatype="NUMBER(5,0)"),
    TableColumn(name="unit_price", datatype="NUMBER(38,4)"),
    TableColumn(name="price", datatype="NUMBER(38,4)"),
    TableColumn(name="order_item_discount_amount", datatype="VARCHAR(16777216)")
]

# Create order_detail table
order_detail_table = Table(
    name="order_detail",
    columns=order_detail_columns
)

raw_schema_ref.tables.create(order_detail_table, mode=CreateMode.or_replace)
print("Table 'order_detail' created successfully")

Table 'order_detail' created successfully


In [13]:
# Define menu table structure
menu_columns = [
    TableColumn(name="menu_id", datatype="NUMBER(19,0)"),
    TableColumn(name="menu_type_id", datatype="NUMBER(38,0)"),
    TableColumn(name="menu_type", datatype="VARCHAR(16777216)"),
    TableColumn(name="truck_brand_name", datatype="VARCHAR(16777216)"),
    TableColumn(name="menu_item_id", datatype="NUMBER(38,0)"),
    TableColumn(name="menu_item_name", datatype="VARCHAR(16777216)"),
    TableColumn(name="item_category", datatype="VARCHAR(16777216)"),
    TableColumn(name="item_subcategory", datatype="VARCHAR(16777216)"),
    TableColumn(name="cost_of_goods_usd", datatype="NUMBER(38,4)"),
    TableColumn(name="sale_price_usd", datatype="NUMBER(38,4)"),
    TableColumn(name="menu_item_health_metrics_obj", datatype="VARIANT")
]

# Create menu table
menu_table = Table(
    name="menu",
    columns=menu_columns
)

raw_schema_ref.tables.create(menu_table, mode=CreateMode.or_replace)
print("Table 'menu' created successfully")

Table 'menu' created successfully


---
# Part 2: Ingest Data üì•

Load data from CSV files in our external stage into the raw tables using Snowpark's `copy_into_table` method.

> ‚è±Ô∏è **Note:** Loading ~1 billion records may take a few minutes.

In [14]:
# Load order_header data using Snowpark copy_into_table
print("Loading order_header data...")

order_header_df = session.read.csv("@tasty_bytes_db.raw.tasty_bytes_stage/raw_pos/order_header/")

# Map $1, $2, $3... to the actual columns by position
result = order_header_df.copy_into_table(
    "tasty_bytes_db.raw.order_header",  
    target_columns=["order_id", "truck_id", "location_id", "customer_id", "discount_id", 
                    "shift_id", "shift_start_time", "shift_end_time", "order_channel", 
                    "order_ts", "served_ts", "order_currency", "order_amount", 
                    "order_tax_amount", "order_discount_amount", "order_total"],
    transformations=["$1", "$2", "$3", "$4", "$5", "$6", "$7", "$8", 
                     "$9", "$10", "$11", "$12", "$13", "$14", "$15", "$16"]
)

print(f"Loaded {len(result)} batch(es) into order_header")

Loading order_header data...
Loaded 179 batch(es) into order_header


In [15]:
# Load order_detail data using Snowpark copy_into_table
print("Loading order_detail data...")

order_detail_df = session.read.csv("@tasty_bytes_db.raw.tasty_bytes_stage/raw_pos/order_detail/")

result = order_detail_df.copy_into_table(
    "tasty_bytes_db.raw.order_detail",
    target_columns=["order_detail_id", "order_id", "menu_item_id", "discount_id", 
                    "line_number", "quantity", "unit_price", "price", "order_item_discount_amount"],
    transformations=["$1", "$2", "$3", "$4", "$5", "$6", "$7", "$8", "$9"]
)

print(f"Successfully loaded {len(result)} batch(es) into order_detail")

Loading order_detail data...
Successfully loaded 270 batch(es) into order_detail


In [16]:
# Load menu data using Snowpark copy_into_table
print("Loading menu data...")

menu_df = session.read.csv("@tasty_bytes_db.raw.tasty_bytes_stage/raw_pos/menu/")

result = menu_df.copy_into_table(
    "tasty_bytes_db.raw.menu",
    target_columns=["menu_id", "menu_type_id", "menu_type", "truck_brand_name", "menu_item_id",
                    "menu_item_name", "item_category", "item_subcategory", "cost_of_goods_usd",
                    "sale_price_usd", "menu_item_health_metrics_obj"],
    transformations=["$1", "$2", "$3", "$4", "$5", "$6", "$7", "$8", "$9", "$10", "$11"]
)

print(f"Successfully loaded {len(result)} batch(es) into menu")

Loading menu data...
Successfully loaded 1 batch(es) into menu


### ‚úÖ Verify Data Loading

Confirm data was loaded correctly by checking row counts using Snowpark.

In [17]:
# Check row counts using Snowpark
order_header_count = session.table("tasty_bytes_db.raw.order_header").count()
order_detail_count = session.table("tasty_bytes_db.raw.order_detail").count()
menu_count = session.table("tasty_bytes_db.raw.menu").count()

print(f"order_header table: {order_header_count:,} rows")
print(f"order_detail table: {order_detail_count:,} rows")
print(f"menu table: {menu_count:,} rows")

order_header table: 248,201,269 rows
order_detail table: 673,655,465 rows
menu table: 100 rows


In [18]:
# View sample data from order_header using Snowpark (you can do this with the other table names too)
print("Sample data from order_header:")
session.table("tasty_bytes_db.raw.order_header").limit(5).show()

Sample data from order_header:
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDER_ID"  |"TRUCK_ID"  |"LOCATION_ID"  |"CUSTOMER_ID"  |"DISCOUNT_ID"  |"SHIFT_ID"  |"SHIFT_START_TIME"  |"SHIFT_END_TIME"  |"ORDER_CHANNEL"  |"ORDER_TS"           |"SERVED_TS"  |"ORDER_CURRENCY"  |"ORDER_AMOUNT"  |"ORDER_TAX_AMOUNT"  |"ORDER_DISCOUNT_AMOUNT"  |"ORDER_TOTAL"  |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|354065609   |314         |7083.0         |NULL           |NULL           |200350252   |09:00:00            |15:00:00      

---
# Part 3: Transform Raw Data üîÑ

Build a 3-tier transformation pipeline using **Snowflake Dynamic Tables**‚ÄîSnowflake's declarative approach to data transformation.

### üí° What are Dynamic Tables?

Dynamic Tables let you define the **desired end state** of your data‚ÄîSnowflake handles the rest!

| Feature | Benefit |
|---------|---------|
| **Automatic Scheduling** | No manual orchestration needed |
| **Incremental Refresh** | Processes only changed rows |
| **Dependency Tracking** | Native DAG visualization in UI |

> **Key Concept:** Define your transformations declaratively, and Snowflake manages the pipeline execution.

## Tier 1: Enrichment Layer

### üìÖ `orders_enriched` - Order Header Enrichment

Enriches raw order data with:
- **Temporal dimensions:** date, day of week, hour
- **Numeric conversions:** discount amounts
- **Derived flags:** has_discount indicator

**Target Lag:** 12 hours (refreshes when data is >12 hours stale)

#### Python Implementation (Snowpark DataFrame API)

In [20]:
# Build orders_enriched Dynamic Table query using Snowpark DataFrame API
orders_enriched_df = session.table("tasty_bytes_db.raw.order_header") \
    .select(
        # Order identifiers
        F.col("order_id"),
        F.col("truck_id"),
        F.col("customer_id"),
        F.col("order_channel"),
        # Temporal dimensions
        F.col("order_ts").alias("order_timestamp"),
        F.to_date(F.col("order_ts")).alias("order_date"),
        F.call_builtin("DAYNAME", F.col("order_ts")).alias("day_name"),
        F.hour(F.col("order_ts")).alias("order_hour"),
        # Financial metrics
        F.col("order_amount"),
        F.col("order_total"),
        F.call_builtin("TRY_TO_NUMBER", F.col("order_discount_amount"), F.lit(10), F.lit(2)).alias("order_discount_amount"),
        # Simple discount flag
        F.when(
            (F.col("discount_id").isNotNull()) & (F.col("discount_id") != ""),
            F.lit(True)
        ).otherwise(F.lit(False)).alias("has_discount")
    ) \
    .where(
        F.col("order_id").isNotNull() & F.col("order_ts").isNotNull()
    )

# Create the dynamic table
orders_enriched_df.create_or_replace_dynamic_table(
    name="tasty_bytes_db.analytics.orders_enriched",
    warehouse="PYDATA_LAB_WH",
    lag="12 hours"
)

print("Tier 1 dynamic table 'orders_enriched' created successfully")


Tier 1 dynamic table 'orders_enriched' created successfully


#### SQL Implementation (Alternative)

In [None]:
# # Create orders_enriched dynamic table query
# orders_enriched_query = """
# SELECT
#     -- Order identifiers
#     order_id,
#     truck_id,
#     customer_id,
#     order_channel,
#     -- Temporal dimensions
#     order_ts AS order_timestamp,
#     DATE(order_ts) AS order_date,
#     DAYNAME(order_ts) AS day_name,
#     HOUR(order_ts) AS order_hour,
#     -- Financial metrics
#     order_amount,
#     order_total,
#     TRY_TO_NUMBER(order_discount_amount, 10, 2) AS order_discount_amount,
#     -- Simple discount flag
#     CASE
#         WHEN discount_id IS NOT NULL AND discount_id != '' THEN TRUE
#         ELSE FALSE
#     END AS has_discount
# FROM tasty_bytes_db.raw.order_header
# WHERE order_id IS NOT NULL
#     AND order_ts IS NOT NULL
# """

# orders_enriched_dt = DynamicTable(
#     name="orders_enriched_sql",
#     target_lag=UserDefinedLag(seconds=43200),  # 12 hours
#     warehouse="PYDATA_LAB_WH",
#     query=orders_enriched_query
# )

# root.databases[database_name].schemas["analytics"].dynamic_tables.create(
#     orders_enriched_dt,
#     mode=CreateMode.or_replace
# )

# print("Tier 1 dynamic table 'orders_enriched_sql' created successfully")

### üõí `order_items_enriched` - Line Item Enrichment

Joins order details with menu to add:
- **Product information:** name, category, brand
- **Profit metrics:** unit profit, line profit, margin %
- **Discount tracking:** line-level discounts

**Target Lag:** 12 hours

#### Python Implementation (Snowpark DataFrame API)

In [21]:
# Build order_items_enriched using Snowpark DataFrame API
od = session.table("tasty_bytes_db.raw.order_detail")
m = session.table("tasty_bytes_db.raw.menu")

order_items_enriched_df = od.join(m, od["menu_item_id"] == m["menu_item_id"], "inner") \
    .select(
        # Order detail identifiers
        od["order_detail_id"],
        od["order_id"],
        od["line_number"],
        # Product information
        od["menu_item_id"].alias("menu_item_id"),
        m["menu_item_name"],
        m["item_category"],
        m["item_subcategory"],
        m["truck_brand_name"],
        m["menu_type"],
        # Quantity and pricing
        od["quantity"],
        od["unit_price"],
        od["price"].alias("line_total"),
        m["cost_of_goods_usd"],
        m["sale_price_usd"],
        # Profit calculations
        (od["unit_price"] - m["cost_of_goods_usd"]).alias("unit_profit"),
        ((od["unit_price"] - m["cost_of_goods_usd"]) * od["quantity"]).alias("line_profit"),
        F.when(
            od["unit_price"] > 0,
            F.round(((od["unit_price"] - m["cost_of_goods_usd"]) / od["unit_price"]) * 100, 2)
        ).otherwise(0).alias("profit_margin_pct"),
        # Discount information
        F.call_builtin("TRY_TO_NUMBER", od["order_item_discount_amount"], F.lit(10), F.lit(2)).alias("line_discount_amount"),
        F.when(
            (od["discount_id"].isNotNull()) & (od["discount_id"] != ""),
            F.lit(True)
        ).otherwise(F.lit(False)).alias("has_discount")
    ) \
    .where(
        od["order_id"].isNotNull() & od["menu_item_id"].isNotNull()
    )

# Create the dynamic table
order_items_enriched_df.create_or_replace_dynamic_table(
    name="tasty_bytes_db.analytics.order_items_enriched",
    warehouse="PYDATA_LAB_WH",
    lag="12 hours"
)

print("Tier 1 dynamic table 'order_items_enriched' created successfully")

Tier 1 dynamic table 'order_items_enriched' created successfully


#### SQL Implementation (Alternative)

In [None]:
# # Create order_items_enriched dynamic table
# order_items_enriched_query = """
# SELECT
#     -- Order detail identifiers
#     od.order_detail_id,
#     od.order_id,
#     od.line_number,
#     -- Product information
#     od.menu_item_id,
#     m.menu_item_name,
#     m.item_category,
#     m.item_subcategory,
#     m.truck_brand_name,
#     m.menu_type,
#     -- Quantity and pricing
#     od.quantity,
#     od.unit_price,
#     od.price AS line_total,
#     m.cost_of_goods_usd,
#     m.sale_price_usd,
#     -- Profit calculations
#     (od.unit_price - m.cost_of_goods_usd) AS unit_profit,
#     (od.unit_price - m.cost_of_goods_usd) * od.quantity AS line_profit,
#     CASE
#         WHEN od.unit_price > 0 THEN
#             ROUND(((od.unit_price - m.cost_of_goods_usd) / od.unit_price) * 100, 2)
#         ELSE 0
#     END AS profit_margin_pct,
#     -- Discount information
#     TRY_TO_NUMBER(od.order_item_discount_amount, 10, 2) AS line_discount_amount,
#     CASE
#         WHEN od.discount_id IS NOT NULL AND od.discount_id != '' THEN TRUE
#         ELSE FALSE
#     END AS has_discount
# FROM tasty_bytes_db.raw.order_detail od
# INNER JOIN tasty_bytes_db.raw.menu m
#     ON od.menu_item_id = m.menu_item_id
# WHERE od.order_id IS NOT NULL
#     AND od.menu_item_id IS NOT NULL
# """

# order_items_enriched_dt = DynamicTable(
#     name="order_items_enriched_sql",
#     target_lag=UserDefinedLag(seconds=43200),  # 12 hours
#     warehouse="PYDATA_LAB_WH",
#     query=order_items_enriched_query
# )

# root.databases[database_name].schemas["analytics"].dynamic_tables.create(
#     order_items_enriched_dt,
#     mode=CreateMode.or_replace
# )

# print("Tier 1 dynamic table 'order_items_enriched_sql' created successfully")

### ‚úÖ Verify Tier 1 Dynamic Tables

In [22]:
# Check orders_enriched using Snowpark
print("Sample data from orders_enriched:")

# Replace with orders_enriched_sql if needed
session.table("tasty_bytes_db.analytics.orders_enriched") \
    .select("order_id", "order_date", "day_name", "order_hour", "order_amount", "order_total", "has_discount") \
    .limit(10).show()

Sample data from orders_enriched:
-----------------------------------------------------------------------------------------------------------
|"ORDER_ID"  |"ORDER_DATE"  |"DAY_NAME"  |"ORDER_HOUR"  |"ORDER_AMOUNT"  |"ORDER_TOTAL"  |"HAS_DISCOUNT"  |
-----------------------------------------------------------------------------------------------------------
|78058528    |2022-08-14    |Sun         |20            |19.0000         |19.0000        |False           |
|78058529    |2022-08-14    |Sun         |20            |11.0000         |11.0000        |False           |
|78058530    |2022-08-14    |Sun         |20            |106.0000        |106.0000       |False           |
|78058531    |2022-08-14    |Sun         |20            |26.0000         |26.0000        |False           |
|78058532    |2022-08-14    |Sun         |20            |60.0000         |60.0000        |False           |
|78058533    |2022-08-14    |Sun         |20            |24.0000         |24.0000        |False       

In [23]:
# Check order_items_enriched using Snowpark
print("Sample data from order_items_enriched:")

# Replace with orders_items_enriched_sql if needed
session.table("tasty_bytes_db.analytics.order_items_enriched") \
    .select("menu_item_name", "item_category", "quantity", "unit_price", "line_total", "line_profit", "profit_margin_pct") \
    .limit(10).show()

Sample data from order_items_enriched:
---------------------------------------------------------------------------------------------------------------------------------
|"MENU_ITEM_NAME"            |"ITEM_CATEGORY"  |"QUANTITY"  |"UNIT_PRICE"  |"LINE_TOTAL"  |"LINE_PROFIT"  |"PROFIT_MARGIN_PCT"  |
---------------------------------------------------------------------------------------------------------------------------------
|Coney Dog                   |Main             |1           |10.0000       |10.0000       |5.0000         |50.00                |
|Breakfast Crepe             |Main             |1           |12.0000       |12.0000       |7.0000         |58.33                |
|The Classic                 |Main             |3           |12.0000       |36.0000       |24.0000        |66.67                |
|Lean Beef Tibs              |Main             |3           |13.0000       |39.0000       |21.0000        |53.85                |
|Bottled Soda                |Beverage         |1  

## Tier 2: Fact Table

### üìã `order_fact` - Unified Order View

Joins Tier 1 tables into a comprehensive fact table containing all order and line item details.

**Target Lag:** `DOWNSTREAM` (auto-refreshes when upstream Tier 1 tables refresh)

#### Python Implementation (Snowpark DataFrame API)

In [24]:
# Build order_fact query using Snowpark DataFrame API
o = session.table("tasty_bytes_db.analytics.orders_enriched")
oi = session.table("tasty_bytes_db.analytics.order_items_enriched")

order_fact_df = o.join(oi, o["order_id"] == oi["order_id"], "inner") \
    .select(
        # Order header fields
        o["order_id"].alias("order_id"),
        o["truck_id"],
        o["customer_id"],
        o["order_channel"],
        o["order_timestamp"],
        o["order_date"],
        o["day_name"],
        o["order_hour"],
        o["order_amount"],
        o["order_total"],
        o["order_discount_amount"].alias("order_level_discount"),
        o["has_discount"].alias("order_has_discount"),
        # Order line item fields
        oi["order_detail_id"],
        oi["line_number"],
        oi["menu_item_id"],
        oi["menu_item_name"],
        oi["item_category"],
        oi["item_subcategory"],
        oi["truck_brand_name"],
        oi["menu_type"],
        oi["quantity"],
        oi["unit_price"],
        oi["line_total"],
        oi["cost_of_goods_usd"],
        oi["sale_price_usd"],
        oi["unit_profit"],
        oi["line_profit"],
        oi["profit_margin_pct"],
        oi["line_discount_amount"],
        oi["has_discount"].alias("line_has_discount")
    )

# Create the dynamic table
order_fact_df.create_or_replace_dynamic_table(
    name="tasty_bytes_db.analytics.order_fact",
    warehouse="PYDATA_LAB_WH",
    lag="DOWNSTREAM"
)

print("Tier 2 Dynamic table 'order_fact' created successfully")

Tier 2 Dynamic table 'order_fact' created successfully


#### SQL Implementation (Alternative)

In [None]:
# # Create order_fact dynamic table query
# order_fact_query = """
# SELECT
#     -- Order header fields
#     o.order_id,
#     o.truck_id,
#     o.customer_id,
#     o.order_channel,
#     o.order_timestamp,
#     o.order_date,
#     o.day_name,
#     o.order_hour,
#     o.order_amount,
#     o.order_total,
#     o.order_discount_amount AS order_level_discount,
#     o.has_discount AS order_has_discount,
#     -- Order line item fields
#     oi.order_detail_id,
#     oi.line_number,
#     oi.menu_item_id,
#     oi.menu_item_name,
#     oi.item_category,
#     oi.item_subcategory,
#     oi.truck_brand_name,
#     oi.menu_type,
#     oi.quantity,
#     oi.unit_price,
#     oi.line_total,
#     oi.cost_of_goods_usd,
#     oi.sale_price_usd,
#     oi.unit_profit,
#     oi.line_profit,
#     oi.profit_margin_pct,
#     oi.line_discount_amount,
#     oi.has_discount AS line_has_discount
# FROM tasty_bytes_db.analytics.orders_enriched_sql o
# INNER JOIN tasty_bytes_db.analytics.order_items_enriched_sql oi
#     ON o.order_id = oi.order_id
# """

# order_fact_dt = DynamicTable(
#     name="order_fact_sql",
#     target_lag=DownstreamLag(),  # Refresh when upstream tables refresh
#     warehouse="PYDATA_LAB_WH",
#     query=order_fact_query
# )

# root.databases[database_name].schemas["analytics"].dynamic_tables.create(
#     order_fact_dt,
#     mode=CreateMode.or_replace
# )

# print("Tier 2 Dynamic table 'order_fact_sql' created successfully")

### ‚úÖ Verify Tier 2 Table

In [25]:
# Check order_fact using Snowpark
print("Sample data from order_fact:")

# Replace with order_fact_sql if needed
session.table("tasty_bytes_db.analytics.order_fact") \
    .select("order_id", "order_date", "menu_item_name", "item_category", "quantity", "order_total", "line_profit", "profit_margin_pct") \
    .limit(10).show()

Sample data from order_fact:
----------------------------------------------------------------------------------------------------------------------------------------------
|"ORDER_ID"  |"ORDER_DATE"  |"MENU_ITEM_NAME"           |"ITEM_CATEGORY"  |"QUANTITY"  |"ORDER_TOTAL"  |"LINE_PROFIT"  |"PROFIT_MARGIN_PCT"  |
----------------------------------------------------------------------------------------------------------------------------------------------
|395274135   |2022-04-20    |Lobster Mac & Cheese       |Main             |3           |94.0000        |15.0000        |33.33                |
|113376148   |2022-01-01    |Bottled Water              |Beverage         |1           |28.0000        |1.5000         |75.00                |
|446638561   |2022-08-02    |Coney Dog                  |Main             |2           |29.0000        |10.0000        |50.00                |
|34408081    |2021-03-03    |Hot Ham & Cheese           |Main             |3           |47.0000        |12.0000  

## Tier 3: Aggregated Metrics

### üìà `daily_business_metrics` - Daily KPIs

Pre-aggregates key business metrics by day:

| Metric Category | Examples |
|-----------------|----------|
| **Volume** | Orders, trucks, customers, items sold |
| **Revenue** | Total revenue, average order value |
| **Profit** | Total profit, average margin |
| **Discounts** | Orders with discounts, discount amounts |

**Target Lag:** `DOWNSTREAM`

In [26]:
# Build daily_business_metrics using Snowpark DataFrame API
daily_business_metrics_df = session.table("tasty_bytes_db.analytics.order_fact") \
    .group_by("order_date", "day_name") \
    .agg(
        # Volume metrics
        F.count_distinct(F.col("order_id")).alias("total_orders"),
        F.count_distinct(F.col("truck_id")).alias("active_trucks"),
        F.count_distinct(F.col("customer_id")).alias("unique_customers"),
        F.sum(F.col("quantity")).alias("total_items_sold"),
        # Revenue metrics
        F.sum(F.col("order_total")).alias("total_revenue"),
        F.round(F.avg(F.col("order_total")), 2).alias("avg_order_value"),
        F.sum(F.col("line_total")).alias("total_line_item_revenue"),
        # Profit metrics
        F.sum(F.col("line_profit")).alias("total_profit"),
        F.round(F.avg(F.col("profit_margin_pct")), 2).alias("avg_profit_margin_pct"),
        # Discount metrics
        F.sum(F.when(F.col("order_has_discount"), 1).otherwise(0)).alias("orders_with_discount"),
        F.sum(F.col("order_level_discount")).alias("total_order_discount_amount"),
        F.sum(F.col("line_discount_amount")).alias("total_line_discount_amount")
    )

# Create the dynamic table using Snowpark DataFrame method
daily_business_metrics_df.create_or_replace_dynamic_table(
    name="tasty_bytes_db.analytics.daily_business_metrics",
    warehouse="PYDATA_LAB_WH",
    lag="DOWNSTREAM"
)

print("(Tier 3) Dynamic table 'daily_business_metrics' created successfully")

(Tier 3) Dynamic table 'daily_business_metrics' created successfully


#### SQL Implementation (Alternative)

In [None]:
# # Create daily_business_metrics dynamic table
# daily_business_metrics_query = """
# SELECT
#     order_date,
#     day_name,
#     -- Volume metrics
#     COUNT(DISTINCT order_id) AS total_orders,
#     COUNT(DISTINCT truck_id) AS active_trucks,
#     COUNT(DISTINCT customer_id) AS unique_customers,
#     SUM(quantity) AS total_items_sold,
#     -- Revenue metrics
#     SUM(order_total) AS total_revenue,
#     ROUND(AVG(order_total), 2) AS avg_order_value,
#     SUM(line_total) AS total_line_item_revenue,
#     -- Profit metrics
#     SUM(line_profit) AS total_profit,
#     ROUND(AVG(profit_margin_pct), 2) AS avg_profit_margin_pct,
#     -- Discount metrics
#     SUM(CASE WHEN order_has_discount THEN 1 ELSE 0 END) AS orders_with_discount,
#     SUM(order_level_discount) AS total_order_discount_amount,
#     SUM(line_discount_amount) AS total_line_discount_amount
# FROM tasty_bytes_db.analytics.order_fact_sql
# GROUP BY order_date, day_name
# """

# daily_business_metrics_dt = DynamicTable(
#     name="daily_business_metrics_sql",
#     target_lag=DownstreamLag(),  # Refresh when upstream tables refresh
#     warehouse="PYDATA_LAB_WH",
#     query=daily_business_metrics_query
# )

# root.databases[database_name].schemas["analytics"].dynamic_tables.create(
#     daily_business_metrics_dt,
#     mode=CreateMode.or_replace
# )

# print("(Tier 3) Dynamic table 'daily_business_metrics_sql' created successfully")

### üèÜ `product_performance_metrics` - Product Analytics

Aggregates sales and profit data by product:

| Metric Category | Examples |
|-----------------|----------|
| **Dimensions** | Name, category, subcategory, brand |
| **Volume** | Order count, units sold |
| **Financial** | Revenue, profit, margins |
| **Performance** | Revenue per unit, profit per unit |

**Target Lag:** `DOWNSTREAM`

In [27]:
# Build product_performance_metrics using Snowpark DataFrame API
product_performance_metrics_df = session.table("tasty_bytes_db.analytics.order_fact") \
    .group_by(
        "menu_item_id",
        "menu_item_name",
        "item_category",
        "item_subcategory",
        "truck_brand_name",
        "menu_type"
    ) \
    .agg(
        # Sales volume metrics
        F.count_distinct(F.col("order_id")).alias("order_count"),
        F.sum(F.col("quantity")).alias("total_units_sold"),
        # Revenue and profit metrics
        F.sum(F.col("line_total")).alias("total_revenue"),
        F.sum(F.col("line_profit")).alias("total_profit"),
        F.round(F.avg(F.col("unit_price")), 2).alias("avg_unit_price"),
        F.round(F.avg(F.col("profit_margin_pct")), 2).alias("avg_profit_margin_pct"),
        # Cost metrics
        F.avg(F.col("cost_of_goods_usd")).alias("avg_cogs"),
        F.avg(F.col("sale_price_usd")).alias("standard_sale_price"),
        # Performance indicators
        (F.sum(F.col("line_total")) / F.call_builtin("NULLIF", F.sum(F.col("quantity")), F.lit(0))).alias("revenue_per_unit"),
        (F.sum(F.col("line_profit")) / F.call_builtin("NULLIF", F.sum(F.col("quantity")), F.lit(0))).alias("profit_per_unit")
    )

# Create the dynamic table using Snowpark DataFrame method
product_performance_metrics_df.create_or_replace_dynamic_table(
    name="tasty_bytes_db.analytics.product_performance_metrics",
    warehouse="PYDATA_LAB_WH",
    lag="DOWNSTREAM"
)

print("(Tier 3) Dynamic table 'product_performance_metrics created successfully")

(Tier 3) Dynamic table 'product_performance_metrics created successfully


#### SQL Implementation (Alternative)

In [None]:
# # Create product_performance_metrics dynamic table
# product_performance_metrics_query = """
# SELECT
#     -- Product dimensions
#     menu_item_id,
#     menu_item_name,
#     item_category,
#     item_subcategory,
#     truck_brand_name,
#     menu_type,
#     -- Sales volume metrics
#     COUNT(DISTINCT order_id) AS order_count,
#     SUM(quantity) AS total_units_sold,
#     -- Revenue and profit metrics
#     SUM(line_total) AS total_revenue,
#     SUM(line_profit) AS total_profit,
#     ROUND(AVG(unit_price), 2) AS avg_unit_price,
#     ROUND(AVG(profit_margin_pct), 2) AS avg_profit_margin_pct,
#     -- Cost metrics
#     AVG(cost_of_goods_usd) AS avg_cogs,
#     AVG(sale_price_usd) AS standard_sale_price,
#     -- Performance indicators
#     SUM(line_total) / NULLIF(SUM(quantity), 0) AS revenue_per_unit,
#     SUM(line_profit) / NULLIF(SUM(quantity), 0) AS profit_per_unit
# FROM tasty_bytes_db.analytics.order_fact_sql
# GROUP BY
#     menu_item_id,
#     menu_item_name,
#     item_category,
#     item_subcategory,
#     truck_brand_name,
#     menu_type
# """

# product_performance_metrics_dt = DynamicTable(
#     name="product_performance_metrics_sql",
#     target_lag=DownstreamLag(),  # Refresh when upstream tables refresh
#     warehouse="PYDATA_LAB_WH",
#     query=product_performance_metrics_query
# )

# root.databases[database_name].schemas["analytics"].dynamic_tables.create(
#     product_performance_metrics_dt,
#     mode=CreateMode.or_replace
# )

# print("(Tier 3) Dynamic table 'product_performance_metrics_sql' created successfully")

### ‚úÖ Verify Tier 3 Tables

In [28]:
# Check daily_business_metrics using Snowpark
print("Sample data from daily_business_metrics:")

# Replace with daily_business_metrics_sql if needed
session.table("tasty_bytes_db.analytics.daily_business_metrics") \
    .select("order_date", "day_name", "total_orders", "unique_customers", "total_revenue", "avg_order_value", "total_profit", "avg_profit_margin_pct") \
    .order_by(F.col("order_date").desc()) \
    .limit(10).show()

Sample data from daily_business_metrics:
----------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDER_DATE"  |"DAY_NAME"  |"TOTAL_ORDERS"  |"UNIQUE_CUSTOMERS"  |"TOTAL_REVENUE"  |"AVG_ORDER_VALUE"  |"TOTAL_PROFIT"  |"AVG_PROFIT_MARGIN_PCT"  |
----------------------------------------------------------------------------------------------------------------------------------------------------
|2022-11-01    |Tue         |425886          |18644               |59793523.2500    |51.75              |8961352.6000    |58.66                    |
|2022-10-31    |Mon         |412664          |15854               |58254392.2500    |51.90              |8705875.3000    |58.67                    |
|2022-10-30    |Sun         |459105          |18020               |64367132.7500    |51.62              |9647234.4500    |58.68                    |
|2022-10-29    |Sat         |465857          |18145              

In [29]:
# Check product_performance_metrics using Snowpark
print("Sample data from product_performance_metrics (top products by revenue):")

# Replace with product_performance_metrics_sql if needed
session.table("tasty_bytes_db.analytics.product_performance_metrics") \
    .select("menu_item_name", "item_category", "order_count", "total_units_sold", "total_revenue", "total_profit", "avg_profit_margin_pct") \
    .order_by(F.col("total_revenue").desc()) \
    .limit(10).show()

Sample data from product_performance_metrics (top products by revenue):
--------------------------------------------------------------------------------------------------------------------------------------------------
|"MENU_ITEM_NAME"            |"ITEM_CATEGORY"  |"ORDER_COUNT"  |"TOTAL_UNITS_SOLD"  |"TOTAL_REVENUE"  |"TOTAL_PROFIT"  |"AVG_PROFIT_MARGIN_PCT"  |
--------------------------------------------------------------------------------------------------------------------------------------------------
|The King Combo              |Main             |12627298       |21563360            |431267200.0000   |172506880.0000  |40.00                    |
|Tandoori Mixed Grill        |Main             |12097473       |20659415            |371869470.0000   |144615905.0000  |38.89                    |
|Lean Chicken Tikka Masala   |Main             |12098077       |20660126            |351222142.0000   |144620882.0000  |41.18                    |
|Spicy Miso Vegetable Ramen  |Main            

### üìã List All Dynamic Tables

In [30]:
# List all dynamic tables in the analytics schema
analytics_schema_ref = root.databases["tasty_bytes_db"].schemas["analytics"]
dynamic_table_collection = DynamicTableCollection(analytics_schema_ref)
dynamic_tables = dynamic_table_collection.iter()
print("Dynamic tables in 'analytics' schema:")
for dt in dynamic_tables:
    print(dt.name)

Dynamic tables in 'analytics' schema:
DAILY_BUSINESS_METRICS
ORDERS_ENRICHED
ORDER_FACT
ORDER_ITEMS_ENRICHED
PRODUCT_PERFORMANCE_METRICS


---
# Part 4: Incremental Refresh ‚ö°

Demonstrate how Dynamic Tables efficiently process only **changed data** rather than reprocessing everything.

### üîß Create Demo Data Generator

This stored procedure generates synthetic orders to simulate new data arriving in our system.

> **Note:** Stored procedure creation currently requires SQL.

In [31]:
# Create stored procedure to generate demo orders
session.sql("""
    CREATE OR REPLACE PROCEDURE tasty_bytes_db.raw.generate_demo_orders(num_rows INTEGER)
    RETURNS STRING
    LANGUAGE SQL
    AS
    $$
    DECLARE
        orders_before INTEGER;
        orders_after INTEGER;
        orders_inserted INTEGER;
        details_before INTEGER;
        details_after INTEGER;
        details_inserted INTEGER;
    BEGIN
        -- Capture counts before insert
        SELECT COUNT(*) INTO :orders_before FROM tasty_bytes_db.raw.order_header;
        SELECT COUNT(*) INTO :details_before FROM tasty_bytes_db.raw.order_detail;
        
        -- Create temporary table with new order IDs to maintain referential integrity
        CREATE OR REPLACE TEMPORARY TABLE new_orders AS
        SELECT
            (1000000 + UNIFORM(1, 999999, RANDOM()))::NUMBER(38,0) AS new_order_id,
            oh.order_id AS original_order_id,
            oh.truck_id,
            oh.location_id,
            oh.customer_id,
            oh.discount_id,
            oh.shift_id,
            oh.shift_start_time,
            oh.shift_end_time,
            oh.order_channel,
            DATEADD('day', DATEDIFF('day', oh.order_ts, CURRENT_DATE()), oh.order_ts) AS order_ts,
            oh.served_ts,
            oh.order_currency,
            oh.order_amount * (0.8 + UNIFORM(0, 0.4, RANDOM())) AS order_amount,
            oh.order_tax_amount,
            oh.order_discount_amount,
            oh.order_total * (0.8 + UNIFORM(0, 0.4, RANDOM())) AS order_total
        FROM tasty_bytes_db.raw.order_header oh
        WHERE oh.order_id IS NOT NULL
        ORDER BY RANDOM()
        LIMIT :num_rows;
        
        -- Insert synthetic order headers
        INSERT INTO tasty_bytes_db.raw.order_header (
            order_id, truck_id, location_id, customer_id, discount_id, shift_id,
            shift_start_time, shift_end_time, order_channel, order_ts, served_ts,
            order_currency, order_amount, order_tax_amount, order_discount_amount,
            order_total
        )
        SELECT
            new_order_id, truck_id, location_id, customer_id, discount_id, shift_id,
            shift_start_time, shift_end_time, order_channel, order_ts, served_ts,
            order_currency, order_amount, order_tax_amount, order_discount_amount,
            order_total
        FROM new_orders;
        
        -- Insert corresponding order details (line items)
        INSERT INTO tasty_bytes_db.raw.order_detail (
            order_detail_id, order_id, menu_item_id, discount_id, line_number,
            quantity, unit_price, price, order_item_discount_amount
        )
        SELECT
            (2000000 + UNIFORM(1, 9999999, RANDOM()))::NUMBER(38,0) AS order_detail_id,
            no.new_order_id AS order_id,
            od.menu_item_id,
            od.discount_id,
            od.line_number,
            od.quantity,
            od.unit_price * (0.8 + UNIFORM(0, 0.4, RANDOM())) AS unit_price,
            od.price * (0.8 + UNIFORM(0, 0.4, RANDOM())) AS price,
            od.order_item_discount_amount
        FROM new_orders no
        INNER JOIN tasty_bytes_db.raw.order_detail od
            ON no.original_order_id = od.order_id;
        
        -- Capture counts after insert
        SELECT COUNT(*) INTO :orders_after FROM tasty_bytes_db.raw.order_header;
        SELECT COUNT(*) INTO :details_after FROM tasty_bytes_db.raw.order_detail;
        
        orders_inserted := :orders_after - :orders_before;
        details_inserted := :details_after - :details_before;
        
        -- Clean up temporary table
        DROP TABLE IF EXISTS new_orders;
        
        RETURN 'Successfully generated ' || orders_inserted::STRING || ' new orders with ' ||
               details_inserted::STRING || ' line items. Total orders: ' || orders_after::STRING;
    END;
    $$
""").collect()

print("‚úÖ Stored procedure 'generate_demo_orders' created")

‚úÖ Stored procedure 'generate_demo_orders' created


### üìä Capture Initial State

Before adding new data, record current row counts for comparison.

In [32]:
# Get current row counts using Snowpark
initial_order_count = session.table("tasty_bytes_db.raw.order_header").count()
initial_detail_count = session.table("tasty_bytes_db.raw.order_detail").count()

print(f"Initial order_header count: {initial_order_count:,}")
print(f"Initial order_detail count: {initial_detail_count:,}")

# Get latest order timestamp using Snowpark
latest_order = session.table("tasty_bytes_db.raw.order_header") \
    .select(F.max("order_ts").alias("latest_order_ts")) \
    .collect()[0][0]
print(f"Latest order timestamp: {latest_order}")

Initial order_header count: 248,201,269
Initial order_detail count: 673,655,465
Latest order timestamp: 2022-11-01 22:59:59


### ‚ûï Generate New Demo Orders

Call the stored procedure to generate 1,200 new synthetic orders.

In [33]:
# Call stored procedure to generate 1200 new orders
result = session.call("tasty_bytes_db.raw.generate_demo_orders", 1200)
print(result)

Successfully generated 1200 new orders with 3275 line items. Total orders: 248202469


### ‚úÖ Verify New Data Inserted

In [34]:
# Get updated row counts
new_order_count = session.table("tasty_bytes_db.raw.order_header").count()
new_detail_count = session.table("tasty_bytes_db.raw.order_detail").count()

print(f"New order_header count: {new_order_count:,} (+{new_order_count - initial_order_count:,} rows)")
print(f"New order_detail count: {new_detail_count:,} (+{new_detail_count - initial_detail_count:,} rows)")

New order_header count: 248,202,469 (+1,200 rows)
New order_detail count: 673,658,740 (+3,275 rows)


### üîÑ Trigger Tier 1 Refresh

Manually refresh Tier 1 tables to process the new data incrementally.

> **Incremental Refresh:** Only the new/changed rows are processed‚Äîmuch faster than a full refresh!

In [35]:
# Refresh Tier 1 tables
# Append _sql if you want to refresh those created with SQL queries

print("Refreshing orders_enriched...")
session.sql("ALTER DYNAMIC TABLE tasty_bytes_db.analytics.orders_enriched REFRESH").collect()

print("Refreshing order_items_enriched...")
session.sql("ALTER DYNAMIC TABLE tasty_bytes_db.analytics.order_items_enriched REFRESH").collect()

print("Tier 1 tables refreshed")

Refreshing orders_enriched...
Refreshing order_items_enriched...
Tier 1 tables refreshed


### üìú Check Tier 1 Refresh History

Query refresh history to see if Snowflake performed `INCREMENTAL` or `FULL` refresh.

In [36]:
# Check refresh history for orders_enriched
# Append _sql to table name if you want to check those created with SQL queries

print("Refresh history for orders_enriched:")
session.sql("""
    SELECT name, refresh_action, state, refresh_start_time, refresh_trigger
    FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
        NAME => 'tasty_bytes_db.ANALYTICS.ORDERS_ENRICHED'
    ))
    ORDER BY refresh_start_time DESC 
    LIMIT 5
""").show()

Refresh history for orders_enriched:
---------------------------------------------------------------------------------------------------------
|"NAME"           |"REFRESH_ACTION"  |"STATE"    |"REFRESH_START_TIME"              |"REFRESH_TRIGGER"  |
---------------------------------------------------------------------------------------------------------
|ORDERS_ENRICHED  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:34:28.435000-08:00  |MANUAL             |
|ORDERS_ENRICHED  |NO_DATA           |SUCCEEDED  |2025-12-24 13:27:06.249000-08:00  |CREATION           |
|ORDERS_ENRICHED  |NO_DATA           |SUCCEEDED  |2025-12-24 13:26:58.862000-08:00  |CREATION           |
|ORDERS_ENRICHED  |NO_DATA           |SUCCEEDED  |2025-12-24 13:25:08.103000-08:00  |CREATION           |
|ORDERS_ENRICHED  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:23:27.600000-08:00  |CREATION           |
---------------------------------------------------------------------------------------------------------



In [37]:
# Check refresh history for order_items_enriched
# Append _sql to table name if you want to check those created with SQL queries

print("Refresh history for order_items_enriched:")
session.sql("""
    SELECT name, refresh_action, state, refresh_start_time, refresh_trigger
    FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
        NAME => 'tasty_bytes_db.ANALYTICS.ORDER_ITEMS_ENRICHED'
    ))
    ORDER BY refresh_start_time DESC 
    LIMIT 5
""").show()

Refresh history for order_items_enriched:
--------------------------------------------------------------------------------------------------------------
|"NAME"                |"REFRESH_ACTION"  |"STATE"    |"REFRESH_START_TIME"              |"REFRESH_TRIGGER"  |
--------------------------------------------------------------------------------------------------------------
|ORDER_ITEMS_ENRICHED  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:34:29.523000-08:00  |MANUAL             |
|ORDER_ITEMS_ENRICHED  |NO_DATA           |SUCCEEDED  |2025-12-24 13:27:06.255000-08:00  |CREATION           |
|ORDER_ITEMS_ENRICHED  |NO_DATA           |SUCCEEDED  |2025-12-24 13:26:58.862000-08:00  |CREATION           |
|ORDER_ITEMS_ENRICHED  |NO_DATA           |SUCCEEDED  |2025-12-24 13:25:08.096000-08:00  |CREATION           |
|ORDER_ITEMS_ENRICHED  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:23:43.841000-08:00  |CREATION           |
----------------------------------------------------------------------

### üîÑ Trigger Tier 2 & 3 Refresh

Refresh downstream tables (they use `DOWNSTREAM` lag, so they cascade automatically).

In [38]:
# Refresh Tier 2 table
# Append _sql if you want to refresh those created with SQL queries

print("Refreshing order_fact...")
session.sql("ALTER DYNAMIC TABLE tasty_bytes_db.analytics.order_fact REFRESH").collect()

# Refresh Tier 3 tables
print("Refreshing daily_business_metrics...")
session.sql("ALTER DYNAMIC TABLE tasty_bytes_db.analytics.daily_business_metrics REFRESH").collect()

print("Refreshing product_performance_metrics...")
session.sql("ALTER DYNAMIC TABLE tasty_bytes_db.analytics.product_performance_metrics REFRESH").collect()

print("All downstream tables refreshed")

Refreshing order_fact...
Refreshing daily_business_metrics...
Refreshing product_performance_metrics...
All downstream tables refreshed


### üìú Check Tier 2 & 3 Refresh History

In [39]:
# Check refresh history for order_fact
# Append _sql to table name if you want to check those created with SQL queries

print("Refresh history for order_fact:")
session.sql("""
    SELECT name, refresh_action, state, refresh_start_time
    FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
        NAME => 'tasty_bytes_db.ANALYTICS.ORDER_FACT'
    ))
    ORDER BY refresh_start_time DESC 
    LIMIT 5
""").show()

Refresh history for order_fact:
--------------------------------------------------------------------------------
|"NAME"      |"REFRESH_ACTION"  |"STATE"    |"REFRESH_START_TIME"              |
--------------------------------------------------------------------------------
|ORDER_FACT  |NO_DATA           |SUCCEEDED  |2025-12-24 13:34:45.008000-08:00  |
|ORDER_FACT  |NO_DATA           |SUCCEEDED  |2025-12-24 13:34:41.840000-08:00  |
|ORDER_FACT  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:34:39.127000-08:00  |
|ORDER_FACT  |NO_DATA           |SUCCEEDED  |2025-12-24 13:27:06.729000-08:00  |
|ORDER_FACT  |NO_DATA           |SUCCEEDED  |2025-12-24 13:26:59.289000-08:00  |
--------------------------------------------------------------------------------



In [40]:
# Check refresh history for daily_business_metrics
# Append _sql to table name if you want to check those created with SQL queries

print("Refresh history for daily_business_metrics:")
session.sql("""
    SELECT name, refresh_action, state, refresh_start_time
    FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
        NAME => 'tasty_bytes_db.ANALYTICS.DAILY_BUSINESS_METRICS'
    ))
    ORDER BY refresh_start_time DESC 
    LIMIT 5
""").show()

Refresh history for daily_business_metrics:
--------------------------------------------------------------------------------------------
|"NAME"                  |"REFRESH_ACTION"  |"STATE"    |"REFRESH_START_TIME"              |
--------------------------------------------------------------------------------------------
|DAILY_BUSINESS_METRICS  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:34:42.306000-08:00  |
|DAILY_BUSINESS_METRICS  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:26:59.681000-08:00  |
--------------------------------------------------------------------------------------------



In [41]:
# Check refresh history for product_performance_metrics
# Append _sql to table name if you want to check those created with SQL queries

print("Refresh history for product_performance_metrics:")
session.sql("""
    SELECT name, refresh_action, state, refresh_start_time
    FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
        NAME => 'tasty_bytes_db.ANALYTICS.PRODUCT_PERFORMANCE_METRICS'
    ))
    ORDER BY refresh_start_time DESC 
    LIMIT 5
""").show()

Refresh history for product_performance_metrics:
-------------------------------------------------------------------------------------------------
|"NAME"                       |"REFRESH_ACTION"  |"STATE"    |"REFRESH_START_TIME"              |
-------------------------------------------------------------------------------------------------
|PRODUCT_PERFORMANCE_METRICS  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:34:45.721000-08:00  |
|PRODUCT_PERFORMANCE_METRICS  |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:27:07.157000-08:00  |
-------------------------------------------------------------------------------------------------



### ‚úÖ Verify Updated Metrics

Confirm that new data has propagated through the entire pipeline.

In [42]:
# Check updated daily metrics using Snowpark
# Append _sql to table name if you want to check those created with SQL queries

print("Updated daily business metrics (most recent dates):")
session.table("tasty_bytes_db.analytics.daily_business_metrics") \
    .select("order_date", "total_orders", "total_items_sold", "total_revenue", "total_profit") \
    .order_by(F.col("order_date").desc()) \
    .limit(5).show()

Updated daily business metrics (most recent dates):
-----------------------------------------------------------------------------------------
|"ORDER_DATE"  |"TOTAL_ORDERS"  |"TOTAL_ITEMS_SOLD"  |"TOTAL_REVENUE"  |"TOTAL_PROFIT"  |
-----------------------------------------------------------------------------------------
|2025-12-24    |1200            |4991                |171300.4000      |25864.4000      |
|2022-11-01    |425886          |1765938             |59793523.2500    |8961352.6000    |
|2022-10-31    |412664          |1715915             |58254392.2500    |8705875.3000    |
|2022-10-30    |459105          |1904604             |64367132.7500    |9647234.4500    |
|2022-10-29    |465857          |1932896             |65379979.0000    |9797068.6500    |
-----------------------------------------------------------------------------------------



In [43]:
# Check updated product performance using Snowpark
# Append _sql to table name if you want to check those created with SQL queries

print("Updated product performance (top products by revenue):")
session.table("tasty_bytes_db.analytics.product_performance_metrics") \
    .select("menu_item_name", "item_category", "total_units_sold", "total_revenue", "total_profit") \
    .order_by(F.col("total_revenue").desc()) \
    .limit(10).show()

Updated product performance (top products by revenue):
--------------------------------------------------------------------------------------------------------
|"MENU_ITEM_NAME"            |"ITEM_CATEGORY"  |"TOTAL_UNITS_SOLD"  |"TOTAL_REVENUE"  |"TOTAL_PROFIT"  |
--------------------------------------------------------------------------------------------------------
|The King Combo              |Main             |21563442            |431268886.0000   |172507552.0000  |
|Tandoori Mixed Grill        |Main             |20659524            |371871477.0000   |144616729.2000  |
|Lean Chicken Tikka Masala   |Main             |20660242            |351224142.9000   |144621673.6000  |
|Spicy Miso Vegetable Ramen  |Main             |18897705            |325985430.2250   |193701398.6250  |
|Tonkotsu Ramen              |Main             |18896002            |325956029.3250   |193683989.4500  |
|Creamy Chicken Ramen        |Main             |18894956            |325938006.5250   |174778320.5750  |


### üìä Refresh Summary

View the latest refresh operation for each dynamic table, including duration and status.

In [44]:
# Summary of latest refresh operations
print("Latest refresh operations for all dynamic tables:")
session.sql("""
    SELECT
        name,
        refresh_action,
        state,
        refresh_start_time,
        refresh_end_time,
        DATEDIFF('second', refresh_start_time, refresh_end_time) AS refresh_duration_seconds
    FROM (
        SELECT 
            name, 
            refresh_action, 
            state, 
            refresh_start_time, 
            refresh_end_time,
            ROW_NUMBER() OVER (PARTITION BY name ORDER BY refresh_start_time DESC) as rn
        FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
    )
    WHERE rn = 1
    ORDER BY name
""").show()

Latest refresh operations for all dynamic tables:
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
|"NAME"                       |"REFRESH_ACTION"  |"STATE"    |"REFRESH_START_TIME"              |"REFRESH_END_TIME"                |"REFRESH_DURATION_SECONDS"  |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
|DAILY_BUSINESS_METRICS       |INCREMENTAL       |SUCCEEDED  |2025-12-24 13:34:42.306000-08:00  |2025-12-24 13:34:43.662000-08:00  |1                           |
|ORDERS_ENRICHED              |NO_DATA           |SUCCEEDED  |2025-12-24 13:34:44.275000-08:00  |2025-12-24 13:34:44.774000-08:00  |0                           |
|ORDER_FACT                   |NO_DATA           |SUCCEEDED  |2025-12-24 13:34:45.008000-08:00  |2025-12-24 13:34:45.487000-08:00  |0       

---
# Part 5: AI-Powered Insights ü§ñ *(Optional)*

Create an AI agent using Snowflake Intelligence to answer natural language questions about your data.

### üèóÔ∏è Create Agent Infrastructure

Set up the database and schema for Snowflake Intelligence agents.

In [45]:
# Grant required privileges (requires ACCOUNTADMIN)
session.use_role("ACCOUNTADMIN")
session.sql("GRANT CREATE DATABASE ON ACCOUNT TO ROLE pydata_lab_role").collect()

# Switch back to pydata_lab_role
session.use_role("pydata_lab_role")
print("Privileges granted for Snowflake Intelligence")

# Create Intelligence database using Python API
intelligence_db = Database(name="snowflake_intelligence")
root.databases.create(intelligence_db, mode=CreateMode.or_replace)
print("Database 'snowflake_intelligence' created")

# Create agents schema using Python API
intelligence_db_ref = root.databases["snowflake_intelligence"]
agents_schema = Schema(name="agents")
intelligence_db_ref.schemas.create(agents_schema, mode=CreateMode.or_replace)
print("Schema 'agents' created")

# Grant permissions
session.sql("GRANT USAGE ON DATABASE snowflake_intelligence TO ROLE pydata_lab_role").collect()
session.sql("GRANT USAGE ON SCHEMA snowflake_intelligence.agents TO ROLE pydata_lab_role").collect()
session.sql("GRANT CREATE AGENT ON SCHEMA snowflake_intelligence.agents TO ROLE pydata_lab_role").collect()

print("Agent infrastructure created")

Privileges granted for Snowflake Intelligence
Database 'snowflake_intelligence' created
Schema 'agents' created
Agent infrastructure created


### üñ•Ô∏è Create Agent via UI

Follow the instructor's guidance to create an agent through the Snowflake UI.

### üí¨ Sample Questions for AI Agent

Once your agent is connected to the semantic model, try asking:

| Category | Example Questions |
|----------|-------------------|
| **Revenue** | "What was the total revenue for the last 30 days?" |
| **Products** | "Which products have the highest profit margins?" |
| **Trends** | "Show me daily revenue trends as a line chart" |
| **Customers** | "How many unique customers did we have yesterday?" |
| **Discounts** | "What percentage of orders include discounts?" |
| **Categories** | "Compare revenue by product category" |
| **Performance** | "Show me the top 5 products by profit" |
| **Timing** | "What are the busiest hours for orders?" |
| **Patterns** | "How does revenue vary by day of week?" |
| **Brands** | "Which truck brands generate the most profit?" |

### ‚úÖ Verify Agent Responses

Use these queries to validate AI agent responses:

In [46]:
# Get top 10 products by revenue using Snowpark
print("Top 10 products by revenue (using Snowpark):")
top_products = session.table("tasty_bytes_db.analytics.product_performance_metrics") \
    .select(
        F.col("menu_item_name"),
        F.col("item_category"),
        F.col("total_revenue"),
        F.col("total_profit"),
        F.col("avg_profit_margin_pct")
    ) \
    .order_by(F.col("total_revenue").desc()) \
    .limit(10)

top_products.show()

Top 10 products by revenue (using Snowpark):
-------------------------------------------------------------------------------------------------------------
|"MENU_ITEM_NAME"            |"ITEM_CATEGORY"  |"TOTAL_REVENUE"  |"TOTAL_PROFIT"  |"AVG_PROFIT_MARGIN_PCT"  |
-------------------------------------------------------------------------------------------------------------
|The King Combo              |Main             |431268886.0000   |172507552.0000  |40.00                    |
|Tandoori Mixed Grill        |Main             |371871477.0000   |144616729.2000  |38.89                    |
|Lean Chicken Tikka Masala   |Main             |351224142.9000   |144621673.6000  |41.18                    |
|Spicy Miso Vegetable Ramen  |Main             |325985430.2250   |193701398.6250  |59.42                    |
|Tonkotsu Ramen              |Main             |325956029.3250   |193683989.4500  |59.42                    |
|Creamy Chicken Ramen        |Main             |325938006.5250   |174778320

In [47]:
# Calculate revenue by category using Snowpark aggregation
print("Revenue by product category (using Snowpark):")
category_revenue = session.table("tasty_bytes_db.analytics.product_performance_metrics") \
    .group_by("item_category") \
    .agg(
        F.sum("total_revenue").alias("category_revenue"),
        F.sum("total_profit").alias("category_profit"),
        F.sum("total_units_sold").alias("units_sold")
    ) \
    .order_by(F.col("category_revenue").desc())

category_revenue.show()

Revenue by product category (using Snowpark):
---------------------------------------------------------------------------
|"ITEM_CATEGORY"  |"CATEGORY_REVENUE"  |"CATEGORY_PROFIT"  |"UNITS_SOLD"  |
---------------------------------------------------------------------------
|Main             |8684728559.6750     |4315773045.1000    |711451561     |
|Snack            |660730871.6000      |390646235.0500     |79299243      |
|Beverage         |506821161.1500      |396277469.7500     |189178359     |
|Dessert          |258586766.9000      |168916568.5500     |50050934      |
---------------------------------------------------------------------------



In [48]:
# Get daily revenue trend for last 30 days using Snowpark
print("Daily revenue trend (last 30 days):")
daily_trend = session.table("tasty_bytes_db.analytics.daily_business_metrics") \
    .select(
        F.col("order_date"),
        F.col("total_orders"),
        F.col("total_revenue"),
        F.col("total_profit"),
        F.col("unique_customers")
    ) \
    .order_by(F.col("order_date").desc()) \
    .limit(30)

daily_trend.show()

Daily revenue trend (last 30 days):
-----------------------------------------------------------------------------------------
|"ORDER_DATE"  |"TOTAL_ORDERS"  |"TOTAL_REVENUE"  |"TOTAL_PROFIT"  |"UNIQUE_CUSTOMERS"  |
-----------------------------------------------------------------------------------------
|2025-12-24    |1200            |171300.4000      |25864.4000      |66                  |
|2022-11-01    |425886          |59793523.2500    |8961352.6000    |18644               |
|2022-10-31    |412664          |58254392.2500    |8705875.3000    |15854               |
|2022-10-30    |459105          |64367132.7500    |9647234.4500    |18020               |
|2022-10-29    |465857          |65379979.0000    |9797068.6500    |18145               |
|2022-10-28    |448874          |63159728.0000    |9464734.6500    |21603               |
|2022-10-27    |439475          |61790017.0000    |9256160.4500    |18029               |
|2022-10-26    |432436          |60760310.2500    |9102695.2000 

---
# üßπ Cleanup (Optional)

> ‚ö†Ô∏è **Warning:** This will permanently delete all data and objects created in this notebook.

In [None]:
# Uncomment the following lines to perform cleanup

# # Drop databases
# root.databases["tasty_bytes_db"].delete()
# print("Database 'tasty_bytes_db' dropped")

# root.databases["snowflake_intelligence"].delete()
# print("Database 'snowflake_intelligence' dropped")

# # Drop warehouse
# root.warehouses["PYDATA_LAB_WH"].delete()
# print("Warehouse 'PYDATA_LAB_WH' dropped")

# # Optionally drop the role
# # session.use_role("ACCOUNTADMIN")
# # session.sql("DROP ROLE IF EXISTS pydata_lab_role").collect()
# # print("Role 'pydata_lab_role' dropped")

print("Cleanup section ready (uncomment to execute)")

In [None]:
# Close the session
session.close()
print("Session closed.")

---
# üéâ Summary & Resources

## What We Built

| Component | Description |
|-----------|-------------|
| **End-to-End Pipeline** | Ingests raw data ‚Üí transforms automatically ‚Üí delivers fresh insights |
| **No Orchestration Code** | Dynamic Tables handle scheduling, dependencies, and incremental processing |
| **AI-Ready Data** | Semantic models enable natural language queries |

## Technologies Used

| Tool | Purpose |
|------|---------|
| **AWS S3** | Cloud storage for raw data files |
| **Snowflake Python APIs** | Database/schema/table management |
| **Snowpark DataFrames** | Data querying and transformation |
| **Dynamic Tables** | Declarative pipeline orchestration |
| **Snowflake Intelligence** | AI-powered insights |

---

## üìö Additional Resources

### Documentation
- [Snowflake Dynamic Tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-intro)
- [Snowpark Python Developer Guide](https://docs.snowflake.com/en/developer-guide/snowpark/python/index)
- [Snowflake Python API Reference](https://docs.snowflake.com/developer-guide/snowflake-python-api/reference/latest/index)
- [Snowflake Intelligence](https://docs.snowflake.com/en/user-guide/snowflake-cortex/snowflake-intelligence)

### Learning
- [Coursera: Snowflake Data Engineering Professional Certificate](https://www.coursera.org/professional-certificates/snowflake-data-engineering)
- [Snowflake Developers YouTube](https://www.youtube.com/channel/UCxgY7r-o_ql8ADIdyiQr3Zw)
- [Snowflake Developer Hub](https://developers.snowflake.com)

### Community
- [Snowflake-Labs GitHub](https://github.com/Snowflake-Labs)