# Snowflake AI Cost Attribution Toolkit

**Comprehensive cost analysis for Cortex Analyst and Cortex Agents**

This notebook helps you:
- üìä Track costs by semantic model and agent
- üí∞ Combine Cortex feature credits with warehouse compute costs
- üìà Analyze query patterns and performance
- üéØ Identify optimization opportunities

**Deployment**: Designed to run as a Snowflake Notebook in Snowsight

## 1. Setup and Configuration

In [None]:
import pandas as pd
import json
import re
from snowflake.snowpark.context import get_active_session

session = get_active_session()
print("‚úÖ Snowflake session initialized")

## 2. Utility Functions

In [None]:
def fetch_semantic_model_paths(session):
    """
    Fetch semantic model paths from all agents in the schema.
    
    Returns:
        DataFrame with agent names, tool names, and semantic model file paths
    """
    try:
        agents_result = session.sql(
            "SHOW AGENTS IN SCHEMA snowflake_intelligence.agents").collect()

        if not agents_result:
            print("‚ö†Ô∏è  No agents found in SNOWFLAKE_INTELLIGENCE.AGENTS schema")
            return pd.DataFrame()

        results = []
        agent_names = [row[1] for row in agents_result]

        for agent in agent_names:
            try:
                describe_sql = f'DESCRIBE AGENT SNOWFLAKE_INTELLIGENCE.AGENTS."{agent}"'
                describe_result = session.sql(describe_sql).collect()

                if not describe_result:
                    continue

                agent_spec_json = describe_result[0][6] if len(
                    describe_result[0]) > 6 else None

                if agent_spec_json:
                    try:
                        spec = json.loads(agent_spec_json)

                        if 'tool_resources' in spec:
                            for tool_name, tool_data in spec['tool_resources'].items():
                                semantic_file = tool_data.get('semantic_model_file')
                                results.append({
                                    "agent_name": agent,
                                    "tool_name": tool_name,
                                    "semantic_model_file": semantic_file
                                })
                    except json.JSONDecodeError:
                        continue
            except Exception as e:
                print(f"‚ö†Ô∏è  Could not describe agent {agent}: {e}")
                continue

    except Exception as e:
        print(f"‚ö†Ô∏è  Could not access SNOWFLAKE_INTELLIGENCE.AGENTS schema: {e}")
        return pd.DataFrame()

    return pd.DataFrame(results)


def get_cortex_analyst_logs_for_all_semantic_models(session):
    """
    Retrieve Cortex Analyst logs for a specific semantic model file.
    
    Returns:
        DataFrame with Cortex Analyst logs including credits
    """

    cortex_analyst_log = session.sql("""SELECT 
          timestamp::STRING as timestamp,
          request_id,
          semantic_model_name,
          tables_referenced,
          user_name,
          source,
          feedback,
          response_status_code,
          request_body:messages[0].content[0].text::STRING as user_question,
          response_body:response_metadata.analyst_latency_ms::NUMBER as latency_ms,
          generated_sql,
          response_body:response_metadata.analyst_orchestration_path::STRING as orchestration_path,
          response_body:response_metadata.question_category::STRING as question_category,
          response_body:message.content[1].confidence.verified_query_used.name::STRING as verified_query_name,
          response_body:message.content[1].confidence.verified_query_used.question::STRING as verified_query_question
        FROM SNOWFLAKE.LOCAL.CORTEX_ANALYST_REQUESTS_V
        WHERE timestamp >= DATEADD(DAY, -30, CURRENT_TIMESTAMP())""")

    df = cortex_analyst_log.to_pandas()
    if not df.empty:
        df["QUERY_TYPE"] = df["ORCHESTRATION_PATH"].apply(
            lambda x: "Verified Query" if x == "vqr_fast_path" else "Non-Verified Query"
        )
        df['CORTEX_ANALYST_CREDITS'] = 67/1000
    return df



def get_cortex_analyst_logs(session, semantic_model_file):
    """
    Retrieve Cortex Analyst logs for a specific semantic model file.
    
    Returns:
        DataFrame with Cortex Analyst logs including credits
    """
    cortex_analyst_log = session.sql(f'''SELECT 
          timestamp::STRING as timestamp,
          request_id,
          semantic_model_name,
          tables_referenced,
          user_name,
          source,
          feedback,
          response_status_code,
          request_body:messages[0].content[0].text::STRING as user_question,
          response_body:response_metadata.analyst_latency_ms::NUMBER as latency_ms,
          generated_sql,
          response_body:response_metadata.analyst_orchestration_path::STRING as orchestration_path,
          response_body:response_metadata.question_category::STRING as question_category,
          response_body:message.content[1].confidence.verified_query_used.name::STRING as verified_query_name,
         response_body:message.content[1].confidence.verified_query_used.question::STRING as verified_query_question
        FROM TABLE(
          SNOWFLAKE.LOCAL.CORTEX_ANALYST_REQUESTS('FILE_ON_STAGE', '{semantic_model_file}'))''')

    df = cortex_analyst_log.to_pandas()
    df["QUERY_TYPE"] = df["ORCHESTRATION_PATH"].apply(
        lambda x: "Verified Query" if x == "vqr_fast_path" else "Non-Verified Query"
    )
    df['CORTEX_ANALYST_CREDITS'] = 67/1000

    return df


def create_sf_intelligence_query_history(session, target_table="CORTEX_ANALYTICS.PUBLIC.SF_INTELLIGENCE_QUERY_HISTORY", force_refresh=False):
    """
    Create or refresh query history table with compute credits.
    Captures queries from Cortex Agents and manually executed Cortex Analyst SQLs.
    
    Args:
        force_refresh: If True, recreates the table with fresh data
    """
    table_parts = target_table.split('.')
    table_name = table_parts[-1].upper()
    schema_name = table_parts[-2].upper() if len(table_parts) > 1 else 'PUBLIC'
    database_name = table_parts[-3].upper() if len(table_parts) > 2 else 'CORTEX_ANALYTICS'

    # Check if table exists
    table_exists = False
    try:
        result = session.sql(f"""
            SELECT COUNT(*) as table_count 
            FROM INFORMATION_SCHEMA.TABLES 
            WHERE TABLE_SCHEMA = '{schema_name}' 
            AND TABLE_NAME = '{table_name}' 
            AND TABLE_CATALOG = '{database_name}'
        """).collect()
        table_exists = result[0]['TABLE_COUNT'] > 0
    except Exception as e:
        print(f"‚ö†Ô∏è  Could not check table: {e}")

    if table_exists and not force_refresh:
        print(f"‚úÖ Table {target_table} already exists (use force_refresh=True to recreate)")
        return

    # Drop and recreate with fresh data
    if table_exists:
        print(f"üîÑ Refreshing {target_table}...")
        session.sql(f"DROP TABLE {target_table}").collect()

    query = f"""
    CREATE TABLE {target_table} AS
    SELECT 
        qh.query_id,
        qh.query_text,
        qh.query_tag,
        qh.start_time,
        qh.total_elapsed_time,
        qh.warehouse_name,
        qh.user_name,
        COALESCE(qah.credits_attributed_compute, 0) as credits_attributed_compute,
        TRIM(REGEXP_REPLACE(
            REGEXP_REPLACE(
                REGEXP_REPLACE(qh.query_text, '--[^\\n]*\\n', '\\n'),
                '/\\*.*?\\*/', ' '
            ),
            '\\s+', ' '
        )) AS cleaned_query_text
    FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
    LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.QUERY_ATTRIBUTION_HISTORY qah 
        ON qh.query_id = qah.query_id
    WHERE qh.start_time >= DATEADD(DAY, -30, CURRENT_TIMESTAMP())
      AND (
          qh.query_tag = 'cortex-agent'
          OR qh.query_text ILIKE '%__media_mix_modeling_agg%'
          OR qh.query_text ILIKE '%__co_branding_agreements%'
          OR qh.query_text ILIKE '%__sp500%'
      )
    """

    try:
        session.sql(query).collect()
        
        # Check how many records we got
        count_result = session.sql(f"SELECT COUNT(*) as cnt FROM {target_table}").collect()
        record_count = count_result[0]['CNT']
        
        print(f"‚úÖ Table {target_table} created with {record_count:,} records")
        
        if record_count == 0:
            print("‚ö†Ô∏è  Warning: No matching queries found in last 30 days")
            print("   This could mean:")
            print("   - No Cortex Analyst queries were executed via agents")
            print("   - Queries don't have 'cortex-agent' tag")
            print("   - ACCOUNT_USAGE.QUERY_HISTORY has a latency (up to 45 min)")
    except Exception as e:
        print(f"‚ùå Error creating table: {e}")
        raise


def normalize_sql(sql_text):
    """Normalize SQL text for better matching between logs and query history."""
    if pd.isna(sql_text) or sql_text is None:
        return ""

    sql_str = str(sql_text).strip()

    sql_str = re.sub(r'doc_ai_q\s*_db\.doc_ai_\s*chema',
                     'doc_ai_qs_db.doc_ai_schema', sql_str)
    sql_str = re.sub(r'co_branding_agreement\s+',
                     'co_branding_agreements ', sql_str)
    sql_str = re.sub(r'__co_branding_agreement\b',
                     '__co_branding_agreements', sql_str)
    sql_str = re.sub(r'have_renewal_option\s*_value',
                     'have_renewal_options_value', sql_str)
    sql_str = re.sub(r'have_force_majeure\s*_value',
                     'have_force_majeure_value', sql_str)

    sql_normalized = re.sub(r'\s+', ' ', sql_str.upper())
    sql_normalized = sql_normalized.rstrip(';')
    sql_normalized = re.sub(r'\bAS\s+(\w+)', r'AS \1', sql_normalized)

    return sql_normalized


def create_cortex_analyst_query_history(session, cortex_analyst_df, sf_intelligence_table="CORTEX_ANALYTICS.PUBLIC.SF_INTELLIGENCE_QUERY_HISTORY"):
    """
    Join Cortex Analyst logs with query history to get full cost attribution.
    
    Returns:
        DataFrame with combined Cortex credits and warehouse costs
    """
    sql = f"SELECT * FROM {sf_intelligence_table}"
    pd_sfi_query_history = session.sql(sql).to_pandas()

    pd_sfi_query_history_copy = pd_sfi_query_history.copy()
    cortex_analyst_copy = cortex_analyst_df.copy()

    pd_sfi_query_history_copy['NORMALIZED_SQL'] = pd_sfi_query_history_copy['CLEANED_QUERY_TEXT'].apply(
        normalize_sql)
    cortex_analyst_copy['NORMALIZED_SQL'] = cortex_analyst_copy['GENERATED_SQL'].apply(
        normalize_sql)

    pd_sfi_query_history_copy = pd_sfi_query_history_copy[
        pd_sfi_query_history_copy['NORMALIZED_SQL'] != ""]
    cortex_analyst_copy = cortex_analyst_copy[cortex_analyst_copy['NORMALIZED_SQL'] != ""]

    pd_sfi_query_history_indexed = pd_sfi_query_history_copy.set_index(
        "NORMALIZED_SQL")
    cortex_analyst_indexed = cortex_analyst_copy.set_index("NORMALIZED_SQL")

    ca_query_history = pd.merge(
        pd_sfi_query_history_indexed,
        cortex_analyst_indexed,
        left_index=True,
        right_index=True,
        how='inner'
    )

    if len(ca_query_history) == 0:
        pd_sfi_query_history_copy['SQL_FRAGMENT'] = pd_sfi_query_history_copy['NORMALIZED_SQL'].apply(
            lambda x: x[:100] if x else "")
        cortex_analyst_copy['SQL_FRAGMENT'] = cortex_analyst_copy['NORMALIZED_SQL'].apply(
            lambda x: x[:100] if x else "")

        pd_sfi_fuzzy = pd_sfi_query_history_copy[pd_sfi_query_history_copy['SQL_FRAGMENT'] != ""].set_index(
            'SQL_FRAGMENT')
        cortex_fuzzy = cortex_analyst_copy[cortex_analyst_copy['SQL_FRAGMENT'] != ""].set_index(
            'SQL_FRAGMENT')

        ca_query_history = pd.merge(
            pd_sfi_fuzzy,
            cortex_fuzzy,
            left_index=True,
            right_index=True,
            how='inner'
        )

    ca_query_history['TOTAL_TIME'] = (
        ca_query_history["TOTAL_ELAPSED_TIME"] + ca_query_history["LATENCY_MS"]
    )

    ca_query_history['TOTAL_CREDITS_WH_AND_CA'] = (
        ca_query_history["CREDITS_ATTRIBUTED_COMPUTE"] +
        ca_query_history["CORTEX_ANALYST_CREDITS"]
    )

    ca_query_history = ca_query_history.reset_index()

    return ca_query_history


def total_cost_by_semantic_model(ca_query_history_df):
    """
    Calculate total cost (Cortex + Warehouse) by semantic model.
    """
    total_cost_by_model = (
        ca_query_history_df[['SEMANTIC_MODEL_NAME', 'TOTAL_CREDITS_WH_AND_CA']]
        .groupby(['SEMANTIC_MODEL_NAME'])
        .sum()
        .round(4)
        .reset_index()
    )

    total_cost_by_model = total_cost_by_model.sort_values(
        'TOTAL_CREDITS_WH_AND_CA',
        ascending=False
    ).reset_index(drop=True)

    return total_cost_by_model


def cost_breakdown_by_semantic_model(ca_query_history_df):
    """
    Detailed cost breakdown showing separate Cortex and warehouse costs.
    """
    cost_breakdown = ca_query_history_df.groupby('SEMANTIC_MODEL_NAME').agg({
        'CORTEX_ANALYST_CREDITS': 'sum',
        'CREDITS_ATTRIBUTED_COMPUTE': 'sum',
        'TOTAL_CREDITS_WH_AND_CA': 'sum',
        'REQUEST_ID': 'count'
    }).rename(columns={
        'CORTEX_ANALYST_CREDITS': 'cortex_analyst_credits',
        'CREDITS_ATTRIBUTED_COMPUTE': 'warehouse_credits',
        'TOTAL_CREDITS_WH_AND_CA': 'total_credits',
        'REQUEST_ID': 'query_count'
    }).round(4).reset_index()

    total_all_credits = cost_breakdown['total_credits'].sum()
    cost_breakdown['percentage_of_total_cost'] = round(
        (cost_breakdown['total_credits'] / total_all_credits) * 100, 2
    )

    cost_breakdown['avg_credits_per_query'] = round(
        cost_breakdown['total_credits'] / cost_breakdown['query_count'], 4
    )

    cost_breakdown = cost_breakdown.sort_values(
        'total_credits',
        ascending=False
    ).reset_index(drop=True)

    return cost_breakdown


def cost_by_agent(semantic_model_paths_df, cost_by_model_df):
    """
    Aggregate costs by agent (multiple semantic models per agent).
    """
    merged = semantic_model_paths_df.merge(
        cost_by_model_df,
        left_on='semantic_model_file',
        right_on='SEMANTIC_MODEL_NAME',
        how='inner'
    )

    agent_costs = merged.groupby('agent_name').agg({
        'TOTAL_CREDITS_WH_AND_CA': 'sum',
        'tool_name': 'count'
    }).rename(columns={
        'TOTAL_CREDITS_WH_AND_CA': 'total_credits',
        'tool_name': 'tool_count'
    }).round(4).reset_index()

    agent_costs = agent_costs.sort_values(
        'total_credits',
        ascending=False
    ).reset_index(drop=True)

    return agent_costs


print("‚úÖ Utility functions loaded")

## 3. Discover Semantic Models and Agents

In [None]:
print("üîç Discovering semantic models and agents...\n")

df_semantic_models = fetch_semantic_model_paths(session)

if not df_semantic_models.empty:
    print(f"‚úÖ Found {len(df_semantic_models)} semantic model configurations\n")
    display(df_semantic_models)
    
    unique_agents = df_semantic_models['agent_name'].nunique()
    unique_models = df_semantic_models['semantic_model_file'].nunique()
    print(f"\nüìä Summary: {unique_agents} agents, {unique_models} unique semantic models")
    print(f"\n‚ÑπÔ∏è  Note: All Cortex Analyst logs will be loaded from CORTEX_ANALYST_REQUESTS_V")
else:
    print("‚ö†Ô∏è  No agents found - will still load all Cortex Analyst logs")


## 4. Setup Tables

In [None]:
print("üìã Setting up cost tracking tables...\n")

database_name = session.get_current_database()
schema_name = session.get_current_schema()
target_table = f"{database_name}.{schema_name}.CORTEX_ANALYST_LOGS"
query_history_table = f"{database_name}.{schema_name}.SF_INTELLIGENCE_QUERY_HISTORY"

session.sql(f"""
CREATE TABLE IF NOT EXISTS {target_table} (
    TIMESTAMP                STRING,
    REQUEST_ID               STRING,
    SEMANTIC_MODEL_NAME      STRING,
    TABLES_REFERENCED        STRING,
    USER_NAME                STRING,
    SOURCE                   STRING,
    FEEDBACK                 STRING,
    RESPONSE_STATUS_CODE     INTEGER,
    USER_QUESTION            STRING,
    LATENCY_MS               NUMBER,
    GENERATED_SQL            STRING,
    ORCHESTRATION_PATH       STRING,
    QUESTION_CATEGORY        STRING,
    VERIFIED_QUERY_NAME      STRING,
    VERIFIED_QUERY_QUESTION  STRING,
    QUERY_TYPE               STRING,
    CORTEX_ANALYST_CREDITS   FLOAT
)
""").collect()
print(f"‚úÖ Created/verified {target_table}")

create_sf_intelligence_query_history(session, query_history_table, force_refresh=True)

print("\n‚úÖ All tables ready")

## 5. Populate Cortex Analyst Logs

In [None]:
print("üìä Populating Cortex Analyst logs...\n")

session.sql(f"TRUNCATE TABLE {target_table}").collect()
print("üóëÔ∏è  Cleared existing logs\n")

total_logs = 0

if not df_semantic_models.empty:
    try:
        df = get_cortex_analyst_logs_for_all_semantic_models(session)

        if not df.empty:
            session.write_pandas(
                df,
                table_name=target_table.split('.')[-1],
                database=database_name,
                schema=schema_name,
                auto_create_table=False,
                overwrite=False
            )
            total_logs = len(df)
            print(f'Found {total_logs} logs')
    except Exception as e:
        print(f"   ‚ùå Error processing : {e}")

        print(f"\n‚úÖ Loaded {total_logs:,} total log entries")

## 6. Cost Analysis

In [None]:
print("üí∞ Analyzing costs...\n")

cortex_logs_df = session.sql(f"SELECT * FROM {target_table}").to_pandas()

if not cortex_logs_df.empty:
    print(f"üìä Retrieved {len(cortex_logs_df)} Cortex Analyst log records\n")
    
    query_history_count = session.sql(f"SELECT COUNT(*) as count FROM {query_history_table}").collect()[0]['COUNT']
    print(f"üìã Found {query_history_count:,} query history records\n")
    
    if query_history_count > 0:
        print("üîó Joining Cortex logs with query history...")
        ca_query_history = create_cortex_analyst_query_history(
            session, 
            cortex_logs_df, 
            query_history_table
        )
        
        if not ca_query_history.empty:
            print(f"‚úÖ Matched {len(ca_query_history)} records ({len(ca_query_history)/len(cortex_logs_df)*100:.1f}% match rate)\n")
            
            print("\n" + "="*60)
            print("üí∏ TOTAL COST BY SEMANTIC MODEL")
            print("="*60)
            total_costs = total_cost_by_semantic_model(ca_query_history)
            display(total_costs)
            
            print("\n" + "="*60)
            print("üìä DETAILED COST BREAKDOWN")
            print("="*60)
            cost_breakdown = cost_breakdown_by_semantic_model(ca_query_history)
            display(cost_breakdown)
            
            if not df_semantic_models.empty:
                print("\n" + "="*60)
                print("ü§ñ COST BY AGENT")
                print("="*60)
                agent_costs = cost_by_agent(df_semantic_models, total_costs)
                display(agent_costs)
                
                total_agent_cost = agent_costs['total_credits'].sum()
                print(f"\nüí∞ Total cost across all agents: {total_agent_cost:.4f} credits")
        else:
            print("‚ö†Ô∏è  No matching records found between Cortex logs and query history")
    else:
        print("‚ö†Ô∏è  No query history data available for cost analysis")
else:
    print("‚ö†Ô∏è  No Cortex Analyst logs found")

## 7. Summary

### üìä What This Notebook Provides:

1. **Cost by Semantic Model**: Detailed attribution for each semantic model file
2. **Cost by Agent**: Aggregated costs across all tools within an agent
3. **Full Cost Visibility**: Combines:
   - Cortex Analyst feature credits (text-to-SQL)
   - Warehouse compute credits (SQL execution)

### üéØ Use Cases:

- Track which agents/models are most expensive
- Optimize high-cost semantic models
- Budget planning and forecasting
- Chargeback to business units

### üîÑ Refresh Data:

Run cells 5-6 to refresh logs and recompute costs.