# Load Daily Transactions and Summaries

* Author: Dexter Stephens
* Last Updated: 12/1/2025

This notebook will load data into the `DAILY_TRANSACTION` and `COMPANY_SUMMARY` tables with support for incremental processing.

In [None]:
-- This won't be needed when we can pass variables to Notebooks!
SELECT current_database() AS DATABASE_NAME, current_schema() AS SCHEMA_NAME

In [None]:
# Import python packages
import logging
from snowflake.core import Root

logger = logging.getLogger("demo_logger")

# Get the target database and schema using the results from the SQL cell above
# This won't be needed when we can pass variables to Notebooks!
current_context_df = cells.sql_get_context.to_pandas()
database_name = current_context_df.iloc[0,0]
schema_name = current_context_df.iloc[0,1]

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# session.use_schema(f"{database_name}.{schema_name}")

logger.info("load_daily_transactions_and_summaries")

## Create a function to check if a table exists

This function uses the [Snowflake Python Management API](https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-overview).

In [None]:
def table_exists(session, database_name='', schema_name='', table_name=''):
    root = Root(session)
    tables = root.databases[database_name].schemas[schema_name].tables.iter(like=table_name)
    for table_obj in tables:
        if table_obj.name == table_name:
            return True

    return False

# Not used, SQL alternative to Python version above
def table_exists2(session, database_name='', schema_name='', table_name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM {}.INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(database_name, schema_name, table_name)).collect()[0]['TABLE_EXISTS']
    return exists

## Pipeline to update daily_transaction table

In [None]:
import snowflake.snowpark.functions as F

table_name = "CLIENT_PORTFOLIO_METRICS"

transactions = session.table("TRANSACTIONS")
company_data = session.table("COMPANY_DATA")

portfolio = transactions.join(company_data, transactions['TICKER'] == company_data['company_ticker'], 'left')

portfolio_agg = portfolio.group_by(F.col('CLIENT'), F.col('TICKER'), F.col('sector'), F.col('industry')) \
    .agg(
        F.sum(F.when(F.col('POSITION') == 'BUY', F.col('QUANTITY')).otherwise(0) - 
              F.when(F.col('POSITION') == 'SELL', F.col('QUANTITY')).otherwise(0)).alias('NET_POSITION'),
        F.avg('PRICE').alias('AVG_PRICE'),
        F.count('*').alias('TRANSACTION_COUNT'),
        F.max('LASTUPDATED').alias('LAST_TRADE_DATE'),
        F.avg('market_cap_billions').alias('AVG_MARKET_CAP'),
        F.avg('pe_ratio').alias('AVG_PE_RATIO')
    ) \
    .select(
        F.col('CLIENT'),
        F.col('TICKER'),
        F.col('sector'),
        F.col('industry'),
        F.col('NET_POSITION'),
        F.round(F.col('AVG_PRICE'), 2).alias('AVG_PRICE'),
        F.col('TRANSACTION_COUNT'),
        F.col('LAST_TRADE_DATE'),
        F.round(F.col('AVG_MARKET_CAP'), 2).alias('AVG_MARKET_CAP'),
        F.round(F.col('AVG_PE_RATIO'), 2).alias('AVG_PE_RATIO')
    )

if not table_exists(session, database_name=database_name, schema_name=schema_name, table_name=table_name):
    portfolio_agg.write.mode("overwrite").save_as_table(table_name)
    logger.info(f"Successfully created {table_name}")
else:
    cols_to_update = {c: portfolio_agg[c] for c in portfolio_agg.schema.names}
    target = session.table(table_name)
    target.merge(portfolio_agg, (target['CLIENT'] == portfolio_agg['CLIENT']) & (target['TICKER'] == portfolio_agg['TICKER']),
        [F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)])
    logger.info(f"Successfully updated {table_name}")

In [None]:
table_name = "PRESS_SENTIMENT_STOCK_IMPACT"

press = session.table("PRESS_AND_EARNINGS")
company = session.table("COMPANY_DATA")
transactions = session.table("TRANSACTIONS")

from snowflake.cortex import sentiment
from snowflake.snowpark.functions import lit

press_with_sentiment = press.with_column('SENTIMENT_SCORE', sentiment(F.col('raw_text')))

daily_sentiment = press_with_sentiment.join(company, press_with_sentiment['company_ticker'] == company['company_ticker']) \
    .select(
        press_with_sentiment['timestamp'],
        press_with_sentiment['company_ticker'].alias('company_ticker'),
        press_with_sentiment['source'],
        company['sector'],
        F.col('SENTIMENT_SCORE')
    ) \
    .group_by(
        F.to_date(F.col('timestamp')).alias('DATE'), 
        F.col('company_ticker'), 
        F.col('source'), 
        F.col('sector')
    ) \
    .agg(
        F.avg('SENTIMENT_SCORE').alias('AVG_SENTIMENT'),
        F.count('*').alias('PRESS_VOLUME')
    )

trading_volume = transactions.group_by(F.to_date(F.col('LASTUPDATED')).alias('DATE'), F.col('TICKER')) \
    .agg(
        F.sum('QUANTITY').alias('TOTAL_VOLUME'),
        F.avg('PRICE').alias('AVG_STOCK_PRICE')
    )

final_analysis = daily_sentiment.join(trading_volume, 
    (daily_sentiment['DATE'] == trading_volume['DATE']) & 
    (daily_sentiment['company_ticker'] == trading_volume['TICKER']), 'left') \
    .select(
        daily_sentiment['DATE'].alias('DATE'),
        daily_sentiment['company_ticker'],
        daily_sentiment['source'],
        daily_sentiment['sector'],
        F.round(daily_sentiment['AVG_SENTIMENT'], 3).alias('SENTIMENT_SCORE'),
        daily_sentiment['PRESS_VOLUME'],
        F.builtin("ZEROIFNULL")(trading_volume['TOTAL_VOLUME']).alias('TRADING_VOLUME'),
        F.round(trading_volume['AVG_STOCK_PRICE'], 2).alias('STOCK_PRICE')
    )

if not table_exists(session, database_name=database_name, schema_name=schema_name, table_name=table_name):
    final_analysis.write.mode("overwrite").save_as_table(table_name)
    logger.info(f"Successfully created {table_name}")
else:
    cols_to_update = {c: final_analysis[c] for c in final_analysis.schema.names}
    target = session.table(table_name)
    target.merge(final_analysis, 
        (target['DATE'] == final_analysis['DATE']) & 
        (target['company_ticker'] == final_analysis['company_ticker']) &
        (target['source'] == final_analysis['source']),
        [F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)])
    logger.info(f"Successfully updated {table_name}")

In [None]:
table_name = "CLIENT_SECTOR_RISK"

transactions = session.table("TRANSACTIONS")
company = session.table("COMPANY_DATA")

client_positions = transactions.join(company, transactions['TICKER'] == company['company_ticker']) \
    .group_by(F.col('CLIENT'), F.col('sector')) \
    .agg(
        F.sum(F.when(F.col('POSITION') == 'BUY', F.col('PRICE') * F.col('QUANTITY'))
              .otherwise(F.col('PRICE') * F.col('QUANTITY') * -1)).alias('TOTAL_EXPOSURE'),
        F.count_distinct('TICKER').alias('UNIQUE_TICKERS'),
        F.sum('QUANTITY').alias('TOTAL_SHARES')
    )

client_totals = transactions.group_by(F.col('CLIENT')) \
    .agg(F.sum(F.when(F.col('POSITION') == 'BUY', F.col('PRICE') * F.col('QUANTITY'))
               .otherwise(F.col('PRICE') * F.col('QUANTITY') * -1)).alias('TOTAL_PORTFOLIO_VALUE'))

risk_analysis = client_positions.join(client_totals, 'CLIENT') \
    .select(
        F.col('CLIENT'),
        F.col('sector'),
        F.round(F.col('TOTAL_EXPOSURE'), 2).alias('SECTOR_EXPOSURE'),
        F.round((F.col('TOTAL_EXPOSURE') / F.col('TOTAL_PORTFOLIO_VALUE')) * 100, 2).alias('SECTOR_CONCENTRATION_PCT'),
        F.col('UNIQUE_TICKERS'),
        F.col('TOTAL_SHARES')
    )

if not table_exists(session, database_name=database_name, schema_name=schema_name, table_name=table_name):
    risk_analysis.write.mode("overwrite").save_as_table(table_name)
    logger.info(f"Successfully created {table_name}")
else:
    cols_to_update = {c: risk_analysis[c] for c in risk_analysis.schema.names}
    target = session.table(table_name)
    target.merge(risk_analysis, (target['CLIENT'] == risk_analysis['CLIENT']) & (target['sector'] == risk_analysis['sector']),
        [F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)])
    logger.info(f"Successfully updated {table_name}")

## Debugging

In [None]:
-- SELECT * FROM CLIENT_PORTFOLIO_METRICS LIMIT 10;
-- SELECT * FROM PRESS_SENTIMENT_STOCK_IMPACT LIMIT 10;
-- SELECT * FROM CLIENT_SECTOR_RISK LIMIT 10;