# Trading data pipeline demo - data pipeline setup

This notebook sets up the a data pipeline for trading data using Dynamic Tables. 

In the Packages menu, add the snowflake-snowpark-python package and then select Start to start the notebook session.


In [None]:
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import lit, col, upper, split, when, sum, count, count_distinct, avg, min, max
import snowflake.snowpark.functions as F


In [None]:
# Get active Snowflake session (no connection setup needed in Snowflake Notebooks)
session = get_active_session()

print("🔗 Connected to Snowflake using active session!")
print(f"Current database: {session.get_current_database()}")
print(f"Current schema: {session.get_current_schema()}")


Now let's use Dynamic Tables to incrementally transform the raw data. Let's start with the USER_PROFILES table. This dynamic table will keep the latest updated record of a user profile by using its USER_ID.

In [None]:
CREATE OR REPLACE DYNAMIC TABLE STAGING.USER_PROFILES_DT
TARGET_LAG = DOWNSTREAM
WAREHOUSE = TRADING_ANALYTICS_WH
INITIALIZE = ON_CREATE
AS
SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY USER_ID ORDER BY _LOADED_AT DESC) as row_num
    FROM RAW_DATA.USER_PROFILES
    
) WHERE row_num = 1

Let's now create another dynamic table to normalise user trades data. In this case, we will use Snowpark Python.

In [None]:
# Create staging view for cleaned trades using Snowpark DataFrames
# First, get the raw trades data
raw_trades_df = session.table("RAW_DATA.USER_TRADES")

# Transform the data using Snowpark operations
trades_clean_df = raw_trades_df.select(
    col("trade_id"),
    col("user_id"),
    col("symbol"),
    upper(col("side")).alias("side"),
    col("quantity"),
    col("price"),
    (col("quantity") * col("price")).alias("notional_value"),
    col("timestamp").cast("timestamp").alias("trade_timestamp"),
    upper(col("status")).alias("status"),
    upper(col("exchange")).alias("exchange"),
    upper(col("order_type")).alias("order_type"),
    col("fees"),
    col("settlement_date").cast("date").alias("settlement_date"),
    col("_loaded_at").alias("_loaded_at"),
    col("_file_name").alias("_file_name"),
    # Derived fields
    split(col("symbol"), F.lit("-"))[0].alias("base_currency"),
    split(col("symbol"), F.lit("-"))[1].alias("quote_currency"),
    F.date_trunc('day', col("timestamp")).alias("trade_date"),
    F.hour(col("timestamp")).alias("trade_hour"),
    when(col("status") == "COMPLETED", 1).otherwise(0).alias("is_completed")
).filter(col("trade_id").isNotNull())


trades_clean_df.create_or_replace_dynamic_table(
    name="STAGING.USER_TRADES_DT",
    warehouse=session.get_current_warehouse(),
    lag="DOWNSTREAM",
    refresh_mode="INCREMENTAL",
    initialize="ON_CREATE"
)

trades_clean_df



Lastly, an additional dynamic table will create daily metrics, which will be used to create views in the analytics layer.

In [None]:
daily_metrics_df = session.table('STAGING.USER_TRADES_DT').filter(col("IS_COMPLETED") == 1)

# 3. Group by the specified columns and apply aggregations
daily_metrics_df = daily_metrics_df.group_by(
    col("TRADE_DATE"),
    col("SYMBOL"),
    col("BASE_CURRENCY"),
    col("QUOTE_CURRENCY"),
    col("EXCHANGE")
).agg(
    count(lit(1)).alias("TOTAL_TRADES"), # COUNT(*)
    sum(when(col("SIDE") == 'BUY', 1).otherwise(0)).alias("BUY_TRADES"), # COUNT(CASE WHEN side = 'BUY' THEN 1 END)
    sum(when(col("SIDE") == 'SELL', 1).otherwise(0)).alias("SELL_TRADES"), # COUNT(CASE WHEN side = 'SELL' THEN 1 END)
    # Since we already filtered by IS_COMPLETED = 1, TOTAL_TRADES is effectively COMPLETED_TRADES
    # If you still want a separate column for clarity, you can use:
    count(when(col("IS_COMPLETED") == 1, 1)).alias("COMPLETED_TRADES"),
    sum(col("QUANTITY")).alias("TOTAL_VOLUME"),
    sum(col("NOTIONAL_VALUE")).alias("TOTAL_NOTIONAL"),
    avg(col("PRICE")).alias("AVG_PRICE"),
    min(col("PRICE")).alias("MIN_PRICE"),
    max(col("PRICE")).alias("MAX_PRICE"),
    sum(col("FEES")).alias("TOTAL_FEES"),
    count_distinct(col("USER_ID")).alias("UNIQUE_TRADERS"),
    # Volume-weighted average price (VWAP)
    (sum(col("NOTIONAL_VALUE")) / sum(col("QUANTITY"))).alias("VWAP")
)

daily_metrics_df = daily_metrics_df.create_or_replace_dynamic_table(
    name="DAILY_TRADING_METRICS",
    warehouse=session.get_current_warehouse(),
    lag="3 minute",
    refresh_mode="INCREMENTAL",
    initialize="ON_CREATE"
)

In [None]:
-- Create analytics views for the Streamlit dashboard
CREATE OR REPLACE VIEW ANALYTICS.TOP_PERFORMING_ASSETS AS
SELECT 
    symbol,
    base_currency,
    SUM(total_notional) as total_volume,
    AVG(avg_price) as avg_price,
    MAX(max_price) as high_price,
    MIN(min_price) as low_price,
    SUM(total_trades) as total_trades,
    SUM(unique_traders) as total_unique_traders
FROM ANALYTICS.DAILY_TRADING_METRICS
GROUP BY symbol, base_currency
ORDER BY total_volume DESC;

CREATE OR REPLACE VIEW ANALYTICS.USER_TRADING_SUMMARY AS
SELECT 
    t.user_id,
    u.first_name || ' ' || u.last_name as full_name,
    u.tier,
    u.country,
    COUNT(*) as total_trades,
    SUM(t.notional_value) as total_volume,
    AVG(t.notional_value) as avg_trade_size,
    COUNT(DISTINCT t.symbol) as unique_symbols
FROM STAGING.USER_TRADES_DT t
JOIN STAGING.USER_PROFILES_DT u ON t.user_id = u.user_id
WHERE t.is_completed = 1
GROUP BY t.user_id, u.first_name, u.last_name, u.tier, u.country
ORDER BY total_volume DESC;


