In [None]:
import pandas as pd
import numpy as np

from snowflake.snowpark import Session
import snowflake.snowpark as snowpark
from snowflake.snowpark.context import get_active_session
from snowflake.cortex import complete

from typing import List
import os
import sys
import json
import time
import requests

#Set up snowflake session vars and env vars
session = get_active_session()
session

In [None]:
SELECT RECORD_ATTRIBUTES:"ai.observability.record_id", * FROM SNOWFLAKE.LOCAL.AI_OBSERVABILITY_EVENTS
    WHERE SCOPE:name = 'snow.cortex.agent'
    ORDER BY TIMESTAMP DESC
    LIMIT 250;

In [None]:
WITH RESULTS AS (SELECT 
    TIMESTAMP AS TS,
    RECORD_ATTRIBUTES:"snow.ai.observability.object.name" AS AGENT_NAME,
    RECORD_ATTRIBUTES:"ai.observability.record_id" AS RECORD_ID,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.thread_id" AS THREAD_ID,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.parent_message_id" AS PARENT_MESSAGE_ID,
    COALESCE(
        PARENT_MESSAGE_ID,
        LAST_VALUE (PARENT_MESSAGE_ID) IGNORE NULLS OVER (
          PARTITION BY AGENT_NAME,
          THREAD_ID
          ORDER BY
            TIMESTAMP ROWS BETWEEN UNBOUNDED PRECEDING
            AND CURRENT ROW
        )
      ) AS PARENT_MESSAGE,

    CONCAT(THREAD_ID, '-', PARENT_MESSAGE) AS THREAD_MESSAGE_ID,

    -- VALUE:"snow.ai.observability.request_body"."messages"[0]."content"[0]."text" AS INPUT_QUERY,
    RECORD_ATTRIBUTES:"ai.observability.record_root.input" AS INPUT_QUERY,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.thinking_response" AS AGENT_PLANNING,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.cortex_analyst.sql_query" AS GENERATED_SQL,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.sql_execution.result" AS SQL_RESULT,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.cortex_search.results" AS CORTEX_SEARCH_RESULT,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.cortex_search.name" AS CORTEX_SEARCH_SERVICE,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.custom_tool.results" AS CUSTOM_TOOL_RESULT,
    RECORD_ATTRIBUTES:"ai.observability.record_root.output" AS AGENT_RESPONSE, 
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.model" AS REASONING_MODEL, 
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.tool.name" AS AVAILABLE_TOOLS, 
    -- CASE 
    --     WHEN RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.tool_selection.name" IS NOT NULL
    --     THEN RECORD:"name" 
    --     ELSE NULL END
    -- RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.tool_selection.name" AS TOOL_SELECTION,
    RECORD:"name" as TOOL_CALL,

    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.tool_selection.type" AS TOOL_TYPE,
    CASE 
        WHEN RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.id" IS NOT NULL
        THEN OBJECT_CONSTRUCT (
            'TOOL_NAME',
            TOOL_CALL,
            'GENERATED_SQL',
            GENERATED_SQL,
            'CORTEX_SEARCH_RESULT',
            CORTEX_SEARCH_RESULT,
            'CUSTOM_TOOL_RESULT',
            CUSTOM_TOOL_RESULT
            )
        ELSE NULL
        END AS TOOL_ARRAY,

    CASE
        WHEN VALUE:"positive"='true' THEN 1
        WHEN VALUE:"positive"='false'THEN 0
        ELSE NULL
        END AS USER_FEEDBACK,
    VALUE:"feedback_message" AS USER_FEEDBACK_MESSAGE,
    RECORD_ATTRIBUTES
    FROM SNOWFLAKE.LOCAL.AI_OBSERVABILITY_EVENTS 
    WHERE SCOPE:name = 'snow.cortex.agent'
    AND RECORD:"name" NOT IN ('SqlExecution', 'SqlExecution_CortexAnalyst','CortexChartToolImpl-data_to_chart')
    AND AGENT_NAME = 'GENOMICS_AGENT'
    -- AND OPERATION !='Agent'
    -- AND THREAD_ID ='475131216317'
    -- AND RECORD_ID = '8f57c963-a79e-4788-8d32-ecfdf730c840'
    -- AND USER_FEEDBACK IS NOT NULL
    -- AND TOOL_SELECTION IS NOT NULL
    -- AND RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.id" IS NOT NULL
    -- 'toolu_bdrk_01WjbFWyvYw2jvxd9dn6irNV'
    ORDER BY TIMESTAMP DESC, START_TIMESTAMP DESC, THREAD_ID)

    SELECT * FROM RESULTS;

In [None]:
WITH RESULTS AS (SELECT 
    TIMESTAMP AS TS,
    RECORD_ATTRIBUTES:"snow.ai.observability.object.name" AS AGENT_NAME,
    RECORD_ATTRIBUTES:"ai.observability.record_id" AS RECORD_ID, 
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.thread_id" AS THREAD_ID,
    RECORD_ATTRIBUTES:"ai.observability.record_root.input" AS INPUT_QUERY,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.thinking_response" AS AGENT_PLANNING,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.cortex_analyst.sql_query" AS GENERATED_SQL,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.sql_execution.result" AS SQL_RESULT,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.cortex_search.results" AS CORTEX_SEARCH_RESULT,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.custom_tool.results" AS CUSTOM_TOOL_RESULT,
    RECORD_ATTRIBUTES:"ai.observability.record_root.output" AS AGENT_RESPONSE, 
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.model" AS REASONING_MODEL, 
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.tool.name" AS AVAILABLE_TOOLS, 
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.tool_selection.name" AS TOOL_SELECTION,
    RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.cortex_search.name" AS CSS_NAME,

    RECORD:"name" as TOOL_CALL,

    RECORD_ATTRIBUTES:"snow.ai.observability.agent.planning.tool_selection.type" AS TOOL_TYPE,
    CASE 
        WHEN RECORD_ATTRIBUTES:"snow.ai.observability.agent.tool.id" IS NOT NULL     
        AND RECORD:"name" NOT IN ('SqlExecution', 'SqlExecution_CortexAnalyst','CortexChartToolImpl-data_to_chart')
 
        THEN OBJECT_CONSTRUCT (
            'tool_name',
            TOOL_CALL,
            'tool_output',
            OBJECT_CONSTRUCT(
            'SQL',
            GENERATED_SQL,
            'search results',
            CORTEX_SEARCH_RESULT,
            'CUSTOM_TOOL_RESULT',
            CUSTOM_TOOL_RESULT
            ))
        ELSE NULL
        END AS TOOL_ARRAY,

    CASE
        WHEN VALUE:"positive"='true' THEN 1
        WHEN VALUE:"positive"='false'THEN 0
        ELSE NULL
        END AS USER_FEEDBACK,
    VALUE:"feedback_message" AS USER_FEEDBACK_MESSAGE,

    RECORD:"name" as OPERATION,
    *
    FROM SNOWFLAKE.LOCAL.AI_OBSERVABILITY_EVENTS 
    WHERE SCOPE:name = 'snow.cortex.agent'
    -- AND AGENT_NAME = 'FINANCE_AGENT'
    AND OPERATION !='Agent'
    -- AND THREAD_ID = '475131216309'
    ORDER BY THREAD_ID, TIMESTAMP, START_TIMESTAMP ASC)

    SELECT 
        RECORD_ID,
        MIN(TIMESTAMP) AS START_TS,
        MAX(TIMESTAMP) AS END_TS,
        DATEDIFF(SECOND, START_TS, END_TS)::FLOAT AS LATENCY, 
        MIN(AGENT_NAME) AS AGENT_NAME,
        MIN(INPUT_QUERY) AS INPUT_QUERY,
        MIN(AGENT_RESPONSE) AS AGENT_RESPONSE,
        MIN(AGENT_PLANNING) AS AGENT_PLANNING,
        ARRAY_AGG(TOOL_SELECTION)  WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS TOOL_CALLS,
        ARRAY_AGG(CSS_NAME)   WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS CSS_CALLS,
        ARRAY_AGG(TOOL_TYPE)  WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS TOOL_TYPES,
        ARRAY_AGG(GENERATED_SQL)  WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS GENERATED_SQLS,
        -- ARRAY_AGG(SQL_RESULT)  WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS SQL_RESULTS,
        ARRAY_AGG(CORTEX_SEARCH_RESULT)  WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS CORTEX_SEARCH_RESULTS,
        ARRAY_AGG(CUSTOM_TOOL_RESULT)  WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS CUSTOM_TOOL_RESULTS,
        -- OBJECT_AGG('GENERATED_SQL', GENERATED_SQL,                              
        -- 'SQL_RESULT', SQL_RESULT,                                  
        -- 'CORTEX_SEARCH_RESULT', CORTEX_SEARCH_RESULT, 
        -- 'CUSTOM_TOOL_RESULT', CUSTOM_TOOL_RESULT)  AS TOOL_RESULTS,

        -- ARRAY_AGG (
          -- OBJECT_CONSTRUCT (
          --   'tool_type',
          --   TOOL_TYPE,
          --   'tool_name',
          --   TOOL_SELECTION,
          --   'generated_sql',
          --   GENERATED_SQL,
          --   'SQL_RESULT',
          --   SQL_RESULT,
          --   'CORTEX_SEARCH_RESULT',
          --   CORTEX_SEARCH_RESULT,
          --   'CUSTOM_TOOL_RESULT',
          --   CUSTOM_TOOL_RESULT
          --           )
        --         ) 
        --     WITHIN GROUP (
        --       ORDER BY
        --         TIMESTAMP ASC
        --     ) 
        --     AS TOOL_RESULTS,

        -- ARRAY_AGG(COALESCE(GENERATED_SQL, CORTEX_SEARCH_RESULT, CUSTOM_TOOL_RESULT))  
        -- WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS TOOL_RESULTS,
        ARRAY_AGG(TOOL_ARRAY) WITHIN GROUP (ORDER BY TIMESTAMP ASC) AS TOOL_ARRAY,
        MIN(USER_FEEDBACK) AS USER_FEEDBACKS,
        MIN(USER_FEEDBACK_MESSAGE) AS USER_FEEDBACK_MESSAGES
    
        FROM RESULTS
        -- WHERE RECORD_ID ilike '%8f57c963-a79e-4788-8d32-ecfdf730c840%'
    
        GROUP BY RECORD_ID
        -- HAVING USER_FEEDBACKS[0] IS NOT NULL
        ORDER BY START_TS DESC;

In [None]:
import json

df = cell6.to_pandas()
df

In [None]:
# df.drop_duplicates(subset=['AGENT_NAME', 'INPUT_QUERIES'], inplace=True)
df.shape

In [None]:
# [print(idx, tool.update({'SEQ', idx})) for idx, tool in enumerate(b)]

In [None]:
b = df.TOOL_ARRAY.apply(lambda x: ast.literal_eval(x))[33]

def add_tool_sequence(tool_list):

    for idx, tool in enumerate(tool_list):
        tool.update({'tool_sequence': idx+1})
        
    new_order = ['tool_sequence', 'tool_name', 'tool_output']

    # final_tool_list = {k: tool_list[k] for k in new_order if k in tool_list}

     # return a new list where each dict's keys appear in new_order
    final_tool_list = [
        {k: tool[k] for k in new_order if k in tool}
        for tool in tool_list
    ]
    return final_tool_list
        

add_tool_sequence(b)

In [None]:
df['TOOL_CALLING']  = df.TOOL_ARRAY.apply(lambda x: add_tool_sequence(ast.literal_eval(x)))

In [None]:
import ast 

def convert_tools_to_sequence(tool_list):
    """
    Convert a list of tool names into a list of dictionaries with tool names and sequence numbers
    
    Args:
        tool_list (list): List of tool names
        
    Returns:
        list: List of dictionaries containing tool_name and tool_sequence
    """
    return [
        {
            'tool_name': ast.literal_eval(tool)[0],
            'tool_sequence': str(idx + 1)
        }
        for idx, tool in enumerate(tool_list)
    ]

df['TOOL_SELECTION'] = df.TOOL_CALLS.apply(lambda x: json.dumps(convert_tools_to_sequence(ast.literal_eval(x))))

In [None]:
SHOW TABLES;

In [None]:
final_df = df[['RECORD_ID', 'START_TS', 'AGENT_NAME',
              'INPUT_QUERY', 'AGENT_RESPONSE', 'TOOL_CALLING', 
               'LATENCY','USER_FEEDBACKS', 'USER_FEEDBACK_MESSAGES',]]

session.write_pandas(final_df, 'PARSED_AI_OBS_LOGS', auto_create_table=True)

In [None]:
import ast 

def create_tool_execution_sequence(df):
    """
    Convert a list of tool names into a list of dictionaries with tool names and sequence numbers
    
    Args:
        tool_list (list): List of tool names
        
    Returns:
        list: List of dictionaries containing tool_name and tool_sequence
    """
    return [
        {
            'tool_name': ast.literal_eval(tool)[0],
            'tool_sequence': str(idx + 1)
        }
        for idx, tool in enumerate(tool_list)
    ]

# df['TOOL_SELECTION'] = df.TOOL_CALLS.apply(lambda x: json.dumps(convert_tools_to_sequence(ast.literal_eval(x))))

In [None]:
CREATE OR REPLACE DYNAMIC TABLE ai_observability_processed
TARGET_LAG = '1 hour'
WAREHOUSE = 'MEDIUM'
AS
WITH ai_flat AS (
    SELECT 
        TIMESTAMP,
        START_TIMESTAMP,
        OBSERVED_TIMESTAMP,
        TRACE,
        RESOURCE_ATTRIBUTES,
        SCOPE,
        RECORD_TYPE,
        RECORD,
        RECORD_ATTRIBUTES,
        VALUE,
        RESOURCE_ATTRIBUTES:snow.database.name::STRING AS database_name,
        RESOURCE_ATTRIBUTES:snow.schema.name::STRING AS schema_name,
        RESOURCE_ATTRIBUTES:snow.user.name::STRING AS user_name,
        RESOURCE_ATTRIBUTES:snow.warehouse.name::STRING AS warehouse_name,
        TRACE:trace_id::STRING AS trace_id,
        TRACE:span_id::STRING AS span_id,
        RECORD:name::STRING AS event_name,
        RECORD:severity_text::STRING AS severity_level
    FROM SNOWFLAKE.LOCAL.AI_OBSERVABILITY_EVENTS
),
base_data AS (
    SELECT
        event_name,
        observed_timestamp,
        start_timestamp,
        timestamp,
        RECORD_ATTRIBUTES['snow.ai.observability.object.name']::STRING AS agent_name,
        RECORD_ATTRIBUTES['snow.ai.observability.user.name']::STRING AS username,
        RECORD_ATTRIBUTES['snow.ai.observability.agent.thread_id']::STRING AS conversation_id,
        RECORD_ATTRIBUTES['ai.observability.record_id']::STRING AS record_id,
        VALUE['snow.ai.observability.response']::STRING AS ai_response,
        VALUE['snow.ai.observability.response_time_ms']::INTEGER AS response_time_ms,
        VALUE['snow.ai.observability.request_body']['messages'] AS messages_array,
        CASE
            WHEN VALUE['positive'] IS NOT NULL AND VALUE['positive'] = TRUE
            THEN 'positive'
            WHEN VALUE['positive'] IS NOT NULL AND VALUE['positive'] = FALSE
            THEN 'negative'
            ELSE 'no feedback'
        END AS user_rating,
        VALUE['feedback_message']::STRING AS user_feedback,
        VALUE['dropdown_selection'] AS dropdown_selection,
        VALUE
    FROM ai_flat 
    WHERE event_name IN ('CORTEX_AGENT_REQUEST', 'CORTEX_AGENT_FEEDBACK')
),
request_data AS (
    SELECT
        b.*,
        CASE 
            WHEN f.value['role']::STRING = 'user' 
            THEN f.value['content'][0]['text']::STRING 
        END AS user_prompt
    FROM base_data b,
    LATERAL FLATTEN(input => b.messages_array, outer => TRUE) f
    WHERE b.event_name = 'CORTEX_AGENT_REQUEST'
),
feedback_data AS (
    SELECT
        record_id,
        user_rating,
        user_feedback,
        dropdown_selection,
        timestamp AS feedback_timestamp
    FROM base_data
    WHERE event_name = 'CORTEX_AGENT_FEEDBACK'
        AND user_rating != 'no feedback'
),
chat_level_data AS (
    SELECT
        r.conversation_id,
        r.record_id,
        r.agent_name,
        r.username,
        r.user_prompt,
        r.ai_response,
        r.timestamp AS chat_sent_time,
        DATEADD('millisecond', r.response_time_ms, r.timestamp) AS ai_response_time,
        COALESCE(f.user_rating, 'no feedback') AS user_rating,
        f.user_feedback,
        f.dropdown_selection,
        r.response_time_ms,
        r.observed_timestamp
    FROM request_data r
    LEFT JOIN feedback_data f 
        ON r.record_id = f.record_id
    WHERE r.user_prompt IS NOT NULL 
        OR r.ai_response IS NOT NULL
)
SELECT
    conversation_id,
    record_id,
    username,
    agent_name,
    chat_sent_time,
    ai_response_time,
    user_prompt,
    ai_response,
    user_rating,
    user_feedback,
    dropdown_selection,
    response_time_ms,
    DATE(chat_sent_time) AS chat_date,
    HOUR(chat_sent_time) AS chat_hour,
    DAYOFWEEK(chat_sent_time) AS day_of_week,
    CASE 
        WHEN user_rating = 'positive' THEN 1 
        WHEN user_rating = 'negative' THEN -1 
        ELSE 0 
    END AS rating_score,
    ROW_NUMBER() OVER (PARTITION BY conversation_id ORDER BY chat_sent_time) AS message_sequence
FROM chat_level_data
ORDER BY conversation_id, chat_sent_time DESC;

In [None]:
SELECT * FROM AI_OBSERVABILITY_PROCESSED;

In [None]:
PARSE_JSON(
        '[
    {
      "tool_name": "Sales_metrics_model",
      "tool_sequence": 1,
      "tool_input": {
        "query": "How many deals did Sarah Johnson win compared to deals she lost",
        "original_query": "How many deals did Sarah Johnson win compared to deals she lost?",
        "previous_related_tool_result_id": "",
        "check_metric_distribution": "include COUNT(*) as total_deals to verify we have complete data for Sarah Johnson",
        "check_missing_data": "include MIN(CLOSE_DATE), MAX(CLOSE_DATE) to check the time range of Sarah Johnson''s deals",
        "has_time_column": true,
        "queried_time_period": ""
      },
      "tool_output": {
        "analyst_latency_ms": 2818,
        "analyst_orchestration_path": "regular_sqlgen",
        "query_id": "01bfe03a-010b-963b-0000-8df925e1a17a",
        "sql": "WITH __sales_metrics AS ( SELECT win_status, sales_rep, close_date FROM sales_intelligence.data.sales_metrics ) SELECT SUM(CASE WHEN win_status = TRUE THEN 1 ELSE 0 END) AS deals_won, SUM(CASE WHEN win_status = FALSE THEN 1 ELSE 0 END) AS deals_lost, COUNT(*) AS total_deals, MIN(close_date) AS min_close_date, MAX(close_date) AS max_close_date FROM __sales_metrics WHERE sales_rep = ''Sarah Johnson'' -- Generated by Cortex Analyst ;",
        "sql_exec_result": {
          "data": [["2","1","3","2024-Feb-10","2024-Feb-15"]],
          "resultSetMetaData": {
            "format": "jsonv2",
            "numRows": 1,
            "partition": 0,
            "rowType": [
              {"name":"DEALS_WON","type":"fixed"},
              {"name":"DEALS_LOST","type":"fixed"},
              {"name":"TOTAL_DEALS","type":"fixed"},
              {"name":"MIN_CLOSE_DATE","type":"date"},
              {"name":"MAX_CLOSE_DATE","type":"date"}
            ]
          },
          "statementHandle": "01bfe03a-010b-963b-0000-8df925e1a17a"
        },
        "text": "The question is clear and I can answer it with the following SQL.",
        "tool_result_id": "toolu_bdrk_01GATA1Fd86aMgQKCMeNZyKF"
      }
    },
    {
      "tool_name": "Sales_metrics_model",
      "tool_sequence": 2,
      "tool_input": {
        "has_time_column": false,
        "original_query": "How many deals did Sarah Johnson win compared to deals she lost?",
        "previous_related_tool_result_id": "toolu_bdrk_01GATA1Fd86aMgQKCMeNZyKF",
        "queried_time_period": "",
        "query": "Show Sarah Johnson''s deals won vs deals lost in a format suitable for comparison chart with outcome type and count columns",
        "check_metric_distribution": "",
        "check_missing_data": ""
      },
      "tool_output": {
        "analyst_latency_ms": 2299,
        "analyst_orchestration_path": "regular_sqlgen",
        "query_id": "01bfe03b-010b-9651-0000-8df925e1954a",
        "sql": "WITH __sales_metrics AS ( SELECT win_status, sales_rep FROM sales_intelligence.data.sales_metrics ) SELECT CASE WHEN win_status = TRUE THEN ''Won'' ELSE ''Lost'' END AS outcome_type, COUNT(*) AS count FROM __sales_metrics WHERE sales_rep = ''Sarah Johnson'' GROUP BY win_status ORDER BY outcome_type -- Generated by Cortex Analyst ;",
        "sql_exec_result": {
          "data": [["Lost","1"],["Won","2"]],
          "resultSetMetaData": {
            "format": "jsonv2",
            "numRows": 2,
            "partition": 0,
            "rowType": [
              {"name":"OUTCOME_TYPE","type":"text"},
              {"name":"COUNT","type":"fixed"}
            ]
          },
          "statementHandle": "01bfe03b-010b-9651-0000-8df925e1954a"
        },
        "text": "The question is clear and I can answer it with the following SQL.",
        "tool_result_id": "toolu_bdrk_01TvXETytnwx2hyB4Fbsiy4g"
      }
    }
  ]'
    )

In [None]:
#!/usr/bin/env python3
"""
Fetch evaluation questions from AI Observability Event Table.

This script provides functions to query the event table and extract evaluation questions
from the record_attributes JSON field. Questions can be deduplicated, filtered, and
optionally include full trace data.

COMMAND-LINE USAGE:
    python fetch_events_from_event_table.py <agent_name> <database> <schema> [OPTIONS]

COMMAND-LINE OPTIONS:
    --connection CONNECTION     Snowflake connection name (default: snowhouse)
    --output OUTPUT            Path to save questions as JSON (default: print to console)
    --where WHERE_CLAUSE       Additional WHERE clause conditions
    --limit N                  Maximum number of records to return
    --unique                   Deduplicate questions by question text
    --include-trace            Fetch complete trace for each record (slower)
    --no-compress              Don't compress traces (only with --include-trace)

COMMAND-LINE EXAMPLES:
    # Fetch all events for an agent:
    python fetch_events_from_event_table.py RAVEN_SALES_ASSISTANT SNOWFLAKE_INTELLIGENCE AGENTS

    # Fetch last 10 unique questions with compressed traces:
    python fetch_events_from_event_table.py MY_AGENT MY_DB MY_SCHEMA \\
        --where "ORDER BY timestamp DESC" --limit 10 --unique --include-trace

    # Fetch traces uncompressed and save to file:
    python fetch_events_from_event_table.py MY_AGENT MY_DB MY_SCHEMA \\
        --include-trace --no-compress --output ./questions.json

    # Get 20 questions from last 30 days:
    python fetch_events_from_event_table.py MY_AGENT MY_DB MY_SCHEMA \\
        --where "timestamp > dateadd(day, -30, current_timestamp())" --limit 20

LIBRARY USAGE:
    from fetch_events_from_event_table import fetch_events_from_event_table

    # Get all questions:
    questions = fetch_events_from_event_table('MY_AGENT', 'MY_DB', 'MY_SCHEMA')

    # Get last 10 unique questions with compressed traces (default):
    questions = fetch_events_from_event_table(
        'MY_AGENT', 'MY_DB', 'MY_SCHEMA',
        where_clause="ORDER BY timestamp DESC",
        limit=10,
        unique=True,
        include_trace=True
    )

    # Get traces uncompressed:
    questions = fetch_events_from_event_table(
        'MY_AGENT', 'MY_DB', 'MY_SCHEMA',
        include_trace=True,
        compress_trace=False
    )
"""

import json
import argparse
import snowflake.connector
from typing import List, Dict, Any, Optional

import pandas as pd
# from trulens.core.feedback.selector import Trace
# from trulens.otel.semconv.trace import SpanAttributes


# Event table constants
EVENT_TABLE_DB = "SNOWFLAKE"
EVENT_TABLE_SCHEMA = "LOCAL"
EVENT_TABLE_FUNCTION = "GET_AI_OBSERVABILITY_EVENTS"
EVENT_TABLE_NAME = "AI_OBSERVABILITY_EVENTS"


def escape_sql_string(value: str) -> str:
    """Escape single quotes in SQL string literals to prevent SQL injection."""
    return value.replace("'", "''")


def fetch_trace_for_record(
    cursor,
    database: str,
    schema: str,
    agent_name: str,
    record_id: str,
    use_udtf: bool = True,
) -> pd.DataFrame:
    """
    Fetch all events (trace) for a given record_id as a DataFrame.

    Args:
        cursor: Snowflake cursor
        database (str): Database where the agent is located
        schema (str): Schema where the agent is located
        agent_name (str): Name of the agent
        record_id (str): The record ID to fetch trace for
        use_udtf (bool): If True, use GET_AI_OBSERVABILITY_EVENTS UDTF; if False, use table directly

    Returns:
        pd.DataFrame: DataFrame containing all events in the trace, ordered by START_TIMESTAMP
    """
    if use_udtf:
        trace_sql = f"""
        SELECT *
        FROM TABLE({EVENT_TABLE_DB}.{EVENT_TABLE_SCHEMA}.{EVENT_TABLE_FUNCTION}(
            '{escape_sql_string(database)}',
            '{escape_sql_string(schema)}',
            '{escape_sql_string(agent_name)}',
            'CORTEX AGENT'
        ))
        WHERE RECORD_ATTRIBUTES:"ai.observability.record_id" = '{escape_sql_string(record_id)}'
        ORDER BY START_TIMESTAMP ASC
        """
    else:
        # trace_sql = f"""
        # SELECT *
        # FROM {EVENT_TABLE_DB}.{EVENT_TABLE_SCHEMA}.{EVENT_TABLE_NAME}
        # WHERE RECORD_ATTRIBUTES:"snow.ai.observability.database.name" = '{escape_sql_string(database)}'
        # AND RECORD_ATTRIBUTES:"snow.ai.observability.schema.name" = '{escape_sql_string(schema)}'
        # AND RECORD_ATTRIBUTES:"snow.ai.observability.object.name" = '{escape_sql_string(agent_name)}'
        # AND RECORD_ATTRIBUTES:"ai.observability.record_id" = '{escape_sql_string(record_id)}'
        # ORDER BY START_TIMESTAMP ASC
        # """
        raise ValueError(
            "UDTF must be True, direct table query is experimental and not recommended"
        )

    cursor.execute(trace_sql)

    # Get column names
    column_names = [desc[0].lower() for desc in cursor.description]

    # Fetch all trace spans
    trace_rows = cursor.fetchall()

    # Convert to list of dicts
    trace_spans = []
    for row in trace_rows:
        row_dict = dict(zip(column_names, row))

        # Parse JSON fields
        for json_field in [
            "record",
            "record_attributes",
            "trace",
            "resource_attributes",
        ]:
            if json_field in row_dict and row_dict[json_field]:
                try:
                    if isinstance(row_dict[json_field], str):
                        row_dict[json_field] = json.loads(row_dict[json_field])
                except json.JSONDecodeError:
                    pass  # Keep as string if not valid JSON

        # Convert timestamps to ISO format strings
        for ts_field in ["start_timestamp", "timestamp"]:
            if ts_field in row_dict and row_dict[ts_field]:
                row_dict[ts_field] = str(row_dict[ts_field])

        trace_spans.append(row_dict)

    # Return as DataFrame
    return pd.DataFrame(trace_spans)


def fetch_events_from_event_table(
    agent_name: str,
    database: str,
    schema: str,
    connection_name: str = "snowhouse",
    where_clause: Optional[str] = None,
    limit: Optional[int] = None,
    unique: bool = False,
    include_trace: bool = False,
    compress_trace: bool = True,
    use_udtf: bool = True,
) -> List[Dict[str, Any]]:
    """
    Fetch evaluation questions from the AI Observability Event Table.

    This function queries the event table using GET_AI_OBSERVABILITY_EVENTS and extracts
    questions and ground truth answers from the record_attributes JSON field using
    TruLens SpanAttributes constants (RECORD_ROOT.INPUT and RECORD_ROOT.GROUND_TRUTH_OUTPUT).

    By default, returns all events without deduplication. Set unique=True to deduplicate by question text.

    Args:
        agent_name (str): Name of the agent to fetch events for
        database (str): Database where the agent is located
        schema (str): Schema where the agent is located
        connection_name (str): Snowflake connection name (default: "snowhouse")
        where_clause (Optional[str]): Additional conditions to filter results (joined with AND).
                                      Can include ORDER BY clauses.
                                      Examples:
                                        - "timestamp > '2025-01-01'"
                                        - "ORDER BY timestamp DESC"
                                      Note: Don't include "WHERE" or "AND" at the beginning of the string - they're added automatically
        limit (Optional[int]): Maximum number of records to return.
                              When unique=False, limits SQL results.
                              When unique=True, limits after deduplication to ensure exact count.
                              Example: limit=10 with unique=True returns exactly 10 unique questions
        unique (bool): If True, deduplicate questions by question text (default: False).
                      When True, keeps first occurrence of each unique question.
        include_trace (bool): If True, fetch complete trace (all spans) for each record and include
                             as JSON string in 'trace' field (default: False).
                             Warning: This makes N+1 queries and can be slow for many records.
        compress_trace (bool): If True (default), compress trace JSON using TruLens compression.
                              If False, return full uncompressed trace JSON.
                              Only used when include_trace=True.
        use_udtf (bool): If True (default), use GET_AI_OBSERVABILITY_EVENTS UDTF; if False, query table directly.
                        NOTE: UDTF requires proper authorization (USAGE privilege on the agent).
                        Use --no-udtf flag if you get "does not exist or not authorized" errors.
                        Direct table approach is more permissive and works for archived/deleted agents.

    Returns:
        list[dict]: List of question dictionaries, each containing:
                   - question (str): The question text
                   - answer (str): The actual answer from the agent
                   - ground_truth (str): The expected answer (ground truth)
                   - record_id (str): The record ID for this question/answer
                   - trace (str): JSON string of trace events (only if include_trace=True).
                                 Compressed using TruLens if compress_trace=True (default),
                                 or uncompressed if compress_trace=False

    Example:
        # Basic usage:
        questions = fetch_events_from_event_table(
            agent_name="RAVEN_SALES_ASSISTANT",
            database="SNOWFLAKE_INTELLIGENCE",
            schema="AGENTS"
        )

        # Get last 10 unique questions with compressed traces (default):
        questions = fetch_events_from_event_table(
            agent_name="MY_AGENT",
            database="MY_DB",
            schema="MY_SCHEMA",
            where_clause="ORDER BY timestamp DESC",
            limit=10,
            unique=True,
            include_trace=True
        )

        # Get traces uncompressed:
        questions = fetch_events_from_event_table(
            agent_name="MY_AGENT",
            database="MY_DB",
            schema="MY_SCHEMA",
            include_trace=True,
            compress_trace=False
        )

        # Get 20 questions from last 30 days (may include duplicates):
        questions = fetch_events_from_event_table(
            agent_name="MY_AGENT",
            database="MY_DB",
            schema="MY_SCHEMA",
            where_clause="timestamp > dateadd(day, -30, current_timestamp())",
            limit=20
        )
    """
    conn = snowflake.connector.connect(connection_name=connection_name)
    try:
        cursor = conn.cursor()

        # Build SQL query
        # NOTE: UDTF validates agent existence and authorization at query time.
        # If agent doesn't exist or user lacks permissions, query will fail.
        # Table approach (default) bypasses these checks and queries historical data directly.
        if use_udtf:
            base_sql = f"""
            SELECT *
            FROM TABLE({EVENT_TABLE_DB}.{EVENT_TABLE_SCHEMA}.{EVENT_TABLE_FUNCTION}(
                '{escape_sql_string(database)}',
                '{escape_sql_string(schema)}',
                '{escape_sql_string(agent_name)}',
                'CORTEX AGENT'
            ))
            WHERE RECORD_ATTRIBUTES:"ai.observability.span_type" = 'record_root'
            """
        else:
            # base_sql = f"""
            # SELECT *
            # FROM {EVENT_TABLE_DB}.{EVENT_TABLE_SCHEMA}.{EVENT_TABLE_NAME}
            # WHERE RECORD_ATTRIBUTES:"snow.ai.observability.database.name" = '{escape_sql_string(database)}'
            # AND RECORD_ATTRIBUTES:"snow.ai.observability.schema.name" = '{escape_sql_string(schema)}'
            # AND RECORD_ATTRIBUTES:"snow.ai.observability.object.name" = '{escape_sql_string(agent_name)}'
            # AND RECORD_ATTRIBUTES:"ai.observability.span_type" = 'record_root'
            # """
            raise ValueError(
                "UDTF must be True, direct table query is experimental and not recommended"
            )

        # Add additional conditions if provided
        if where_clause:
            # Check if where_clause starts with ORDER BY (no AND needed)
            if where_clause.strip().upper().startswith("ORDER BY"):
                sql = f"{base_sql} {where_clause}"
            else:
                sql = f"{base_sql} AND {where_clause}"
        else:
            sql = base_sql

        # Add LIMIT if specified (only when not deduplicating, otherwise apply after dedup)
        if limit is not None and limit > 0 and not unique:
            sql = f"{sql} LIMIT {limit}"

        print(f"Querying event table for agent: {database}.{schema}.{agent_name}")
        cursor.execute(sql)

        # Get column names from cursor description
        column_names = [desc[0].lower() for desc in cursor.description]

        # Process rows and extract questions
        rows = cursor.fetchall()

        print(f"Retrieved {len(rows)} event records")
        print(f"Columns: {', '.join(column_names)}")

        if unique:
            # Deduplicate by question text
            questions_dict = {}
            for row in rows:
                try:
                    # Convert row to dict using column names
                    row_dict = dict(zip(column_names, row))

                    # Parse record_attributes JSON
                    record_attributes_str = row_dict.get("record_attributes")

                    # Handle both string and already-parsed JSON
                    if isinstance(record_attributes_str, str):
                        record_attributes = json.loads(record_attributes_str)
                    else:
                        record_attributes = record_attributes_str

                    # Extract fields from record_attributes
                    question = record_attributes.get(SpanAttributes.RECORD_ROOT.INPUT)
                    answer = record_attributes.get(SpanAttributes.RECORD_ROOT.OUTPUT)
                    ground_truth = record_attributes.get(
                        SpanAttributes.RECORD_ROOT.GROUND_TRUTH_OUTPUT
                    )
                    record_id = record_attributes.get(SpanAttributes.RECORD_ID)

                    # Skip if no question
                    if not question:
                        continue

                    # Skip duplicates (keep first occurrence)
                    if question in questions_dict:
                        continue

                    # Build question data
                    question_data = {
                        "question": question,
                        "answer": answer,
                        "ground_truth": ground_truth,
                        "record_id": record_id,
                    }

                    questions_dict[question] = question_data

                except (json.JSONDecodeError, KeyError) as e:
                    print(f"Warning: Failed to parse event record: {e}")
                    continue

            # Convert dict to list
            questions = list(questions_dict.values())

            # Apply limit after deduplication
            if limit is not None and limit > 0:
                questions = questions[:limit]

            print(f"Extracted {len(questions)} unique questions from event table")
        else:
            # No deduplication
            questions = []
            for row in rows:
                try:
                    # Convert row to dict using column names
                    row_dict = dict(zip(column_names, row))

                    # Parse record_attributes JSON
                    record_attributes_str = row_dict.get("record_attributes")

                    # Handle both string and already-parsed JSON
                    if isinstance(record_attributes_str, str):
                        record_attributes = json.loads(record_attributes_str)
                    else:
                        record_attributes = record_attributes_str

                    # Extract fields from record_attributes
                    question = record_attributes.get(SpanAttributes.RECORD_ROOT.INPUT)
                    answer = record_attributes.get(SpanAttributes.RECORD_ROOT.OUTPUT)
                    ground_truth = record_attributes.get(
                        SpanAttributes.RECORD_ROOT.GROUND_TRUTH_OUTPUT
                    )
                    record_id = record_attributes.get(SpanAttributes.RECORD_ID)

                    # Skip if no question
                    if not question:
                        continue

                    # Build question data
                    question_data = {
                        "question": question,
                        "answer": answer,
                        "ground_truth": ground_truth,
                        "record_id": record_id,
                    }

                    questions.append(question_data)

                except (json.JSONDecodeError, KeyError) as e:
                    print(f"Warning: Failed to parse event record: {e}")
                    continue

            print(f"Extracted {len(questions)} questions from event table")

        # Fetch traces if requested
        if include_trace:
            print(f"\nFetching traces for {len(questions)} records...")
            for i, question in enumerate(questions):
                record_id = question.get("record_id")
                if record_id:
                    try:
                        events_df = fetch_trace_for_record(
                            cursor=cursor,
                            database=database,
                            schema=schema,
                            agent_name=agent_name,
                            record_id=record_id,
                            use_udtf=use_udtf,
                        )

                        if not events_df.empty:
                            trace = Trace()
                            trace.events = events_df

                            if compress_trace:
                                trace_json = trace.to_compressed_json(
                                    default_handler=str
                                )
                            else:
                                trace_json = (
                                    trace.events.to_json(default_handler=str)
                                    if trace.events is not None
                                    else "{}"
                                )
                            question["trace"] = trace_json
                        else:
                            question["trace"] = "{}"

                        if (i + 1) % 10 == 0:
                            print(f"  Fetched {i + 1}/{len(questions)} traces...")
                    except Exception as e:
                        print(
                            f"Warning: Failed to fetch trace for record_id {record_id}: {e}"
                        )
                        question["trace"] = None
                else:
                    question["trace"] = None
            print("✓ Completed fetching traces")

        return questions

    finally:
        cursor.close()
        conn.close()


In [None]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Fetch evaluation questions from AI Observability Event Table",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Fetch all events:
  python fetch_events_from_event_table.py RAVEN_SALES_ASSISTANT SNOWFLAKE_INTELLIGENCE AGENTS

  # Fetch last 10 unique questions with traces (compressed by default):
  python fetch_events_from_event_table.py MY_AGENT MY_DB MY_SCHEMA \\
      --where "ORDER BY timestamp DESC" --limit 10 --unique --include-trace

  # Fetch traces uncompressed:
  python fetch_events_from_event_table.py MY_AGENT MY_DB MY_SCHEMA \\
      --include-trace --no-compress

  # Save to file:
  python fetch_events_from_event_table.py MY_AGENT MY_DB MY_SCHEMA \\
      --connection snowhouse --output ./questions.json

  # Get 20 questions from last 30 days:
  python fetch_events_from_event_table.py MY_AGENT MY_DB MY_SCHEMA \\
      --where "timestamp > dateadd(day, -30, current_timestamp())" --limit 20
        """,
    )

    # Required arguments
    parser.add_argument("agent_name", help="Name of the agent to fetch events for")
    parser.add_argument("database", help="Database where agent is located")
    parser.add_argument("schema", help="Schema where agent is located")

    # Optional arguments
    parser.add_argument(
        "--connection",
        default="snowhouse",
        help="Snowflake connection name (default: snowhouse)",
    )
    parser.add_argument(
        "--output", help="Path to save questions as JSON (default: print to console)"
    )
    parser.add_argument(
        "--where",
        help="Additional WHERE clause conditions (don't include 'WHERE' or 'AND')",
    )
    parser.add_argument("--limit", type=int, help="Maximum number of records to return")
    parser.add_argument(
        "--unique", action="store_true", help="Deduplicate questions by question text"
    )
    parser.add_argument(
        "--include-trace",
        action="store_true",
        help="Fetch complete trace for each record (slower)",
    )
    parser.add_argument(
        "--no-compress",
        action="store_true",
        help="Don't compress traces (only with --include-trace)",
    )
    parser.add_argument(
        "--no-udtf",
        action="store_false",
        dest="use_udtf",
        help="Query event table directly instead of using GET_AI_OBSERVABILITY_EVENTS UDTF (use if you get authorization errors)",
    )

    args = parser.parse_args()

    print("\nFetching Events from AI Observability Event Table")
    print("=" * 80)
    print(f"Agent: {args.database}.{args.schema}.{args.agent_name}")
    print(f"Connection: {args.connection}")
    if args.where:
        print(f"Filter: {args.where}")
    if args.limit:
        print(f"Limit: {args.limit}")
    if args.unique:
        print("Mode: Unique questions only")
    if args.include_trace:
        compress_mode = "uncompressed" if args.no_compress else "compressed"
        print(f"Traces: Enabled ({compress_mode})")
    print("=" * 80 + "\n")

    # Fetch questions
    questions = fetch_events_from_event_table(
        agent_name=args.agent_name,
        database=args.database,
        schema=args.schema,
        connection_name=args.connection,
        where_clause=args.where,
        limit=args.limit,
        unique=args.unique,
        include_trace=args.include_trace,
        compress_trace=not args.no_compress,
        use_udtf=args.use_udtf,
    )

    # Output results
    if args.output:
        with open(args.output, "w") as f:
            json.dump(questions, f, indent=2)
        print(f"\n✓ Saved {len(questions)} questions to: {args.output}")
    else:
        print(f"\n{'='*80}")
        print("QUESTIONS")
        print("=" * 80 + "\n")
        print(json.dumps(questions, indent=2))

# ARCHIVE BELOW

In [None]:
SELECT SCOPE:name, COUNT(*) FROM SNOWFLAKE.LOCAL.AI_OBSERVABILITY_EVENTS 
-- WHERE
-- ORDER BY TIMESTAMP DESC
GROUP BY SCOPE
-- limit 10;