# AI Observability for Cortex Analyst with Semantic Views

This notebook registers a Cortex Analyst application to Snowflake's native AI Observability using TruLens with OTel-style instrumentation.

**Features:**
1. Setup & Configuration
2. Cortex Analyst App Class
3. TruLens Evaluation Setup
4. Load Test Dataset from Snowflake
5. Run Evaluation
6. View Results

**Required Packages:** `trulens-core`, `trulens-connectors-snowflake`, `trulens-providers-cortex`

## 1. Setup & Configuration

**Update the values below to match your environment.**

In [None]:
import json
import time
import pandas as pd
from typing import List, Optional

from snowflake.snowpark.context import get_active_session

session = get_active_session()
print("Snowflake Notebook session active")

In [None]:
# =============================================================================
# USER CONFIGURATION - Update these values
# =============================================================================

# Semantic View to evaluate
SEMANTIC_VIEW = "<DATABASE>.<SCHEMA>.<SEMANTIC_VIEW_NAME>"  # e.g., "HAEBI_DEMO.HAEBI_SCHEMA.DUMMY_SEMANTIC_VIEW"

# Test Dataset Location (must have columns: INPUT_QUERY, OUTPUT_SQL)
TEST_DATASET_DATABASE = "<DATABASE>"      # e.g., "HAEBI_DEMO"
TEST_DATASET_SCHEMA = "<SCHEMA>"          # e.g., "HAEBI_SCHEMA"
TEST_DATASET_TABLE = "<TABLE_NAME>"       # e.g., "CORTEX_ANALYST_TEST_DATA"

# LLM Configuration
JUDGE_MODEL = "mistral-large2"
SUMMARIZATION_MODEL = "mistral-large2"

# =============================================================================
# Derived values (no need to change)
# =============================================================================
TEST_DATASET_FQN = f"{TEST_DATASET_DATABASE}.{TEST_DATASET_SCHEMA}.{TEST_DATASET_TABLE}"

# API Endpoints (for internal Snowflake API)
ANALYST_API_ENDPOINT = "/api/v2/cortex/analyst/message"
COMPLETE_API_ENDPOINT = "/api/v2/cortex/inference:complete"

print(f"Semantic View: {SEMANTIC_VIEW}")
print(f"Test Dataset: {TEST_DATASET_FQN}")
print(f"Judge Model: {JUDGE_MODEL}")

## 2. Cortex Analyst Application Class

Instrumented for Snowflake AI Observability using OTel-style SpanAttributes.

In [None]:
import _snowflake
import re

from trulens.core.otel.instrument import instrument
from trulens.otel.semconv.trace import SpanAttributes


class CortexAnalystApp:
    """
    Cortex Analyst application instrumented for Snowflake AI Observability.
    Uses _snowflake.send_snow_api_request() for internal API calls.
    """
    
    def __init__(self, session, semantic_view: str):
        self.session = session
        self.semantic_view = semantic_view
        self.messages = []  # Conversation history for multi-turn
    
    @instrument(
        span_type=SpanAttributes.SpanType.RETRIEVAL,
        attributes={
            SpanAttributes.RETRIEVAL.QUERY_TEXT: "query",
            SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: "return",
        }
    )
    def call_cortex_analyst(self, query: str) -> dict:
        """
        Call Cortex Analyst API with semantic view to get SQL interpretation.
        This is the RETRIEVAL step - converting natural language to SQL.
        """
        # Add user message to conversation
        self.messages.append({
            "role": "user",
            "content": [{"type": "text", "text": query}]
        })
        
        request_body = {
            "messages": self.messages,
            "semantic_view": self.semantic_view,
        }
        
        resp = _snowflake.send_snow_api_request(
            "POST",
            ANALYST_API_ENDPOINT,
            {}, {}, request_body, {}, 60000
        )
        
        status = resp.get("status")
        if status and int(status) >= 400:
            error_msg = f"Cortex Analyst API error {status}"
            self.messages.pop()  # Remove failed message
            raise Exception(error_msg)
        
        body = resp.get("content", "")
        if isinstance(body, (bytes, bytearray)):
            body = body.decode("utf-8", errors="replace")
        
        result = json.loads(body)
        request_id = resp.get("headers", {}).get("X-Snowflake-Request-Id", "")
        
        # Store analyst response in conversation history
        if "message" in result:
            self.messages.append({**result["message"], "request_id": request_id})
        
        # Extract context for retrieval span
        context = self._extract_context(result)
        
        return {
            "raw_response": result,
            "request_id": request_id,
            "context": context,
        }
    
    def _extract_context(self, api_response: dict) -> List[str]:
        """Extract interpretation and SQL as retrieval context."""
        contexts = []
        content = api_response.get("message", {}).get("content", [])
        
        for item in content:
            if item.get("type") == "text":
                contexts.append(f"Interpretation: {item.get('text', '')}")
            elif item.get("type") == "sql":
                contexts.append(f"SQL: {item.get('statement', '')}")
        
        return contexts if contexts else ["No context generated"]
    
    def _extract_sql(self, api_response: dict) -> Optional[str]:
        """Extract SQL statement from API response."""
        content = api_response.get("message", {}).get("content", [])
        for item in content:
            if item.get("type") == "sql":
                return item.get("statement")
        return None
    
    def _extract_interpretation(self, api_response: dict) -> str:
        """Extract interpretation text from API response."""
        content = api_response.get("message", {}).get("content", [])
        for item in content:
            if item.get("type") == "text":
                return item.get("text", "")
        return "No interpretation"
    
    def execute_sql(self, sql: str) -> str:
        """Execute SQL and return results as markdown."""
        if not sql:
            return "No SQL to execute"
        
        try:
            df = self.session.sql(sql).to_pandas()
            return df.to_markdown(index=False)
        except Exception as e:
            return f"SQL execution error: {str(e)}"
    
    @instrument(span_type=SpanAttributes.SpanType.GENERATION)
    def generate_summary(self, query: str, sql_results: str) -> str:
        """
        Generate human-readable summary from SQL results using Cortex Complete.
        This is the GENERATION step - LLM summarization.
        """
        prompt = f"""Summarize the following SQL query results into a clear, human-readable response.

Original Question: {query}

SQL Results:
{sql_results}

Provide a concise summary that directly answers the original question."""

        payload = {
            "model": SUMMARIZATION_MODEL,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant that summarizes data query results clearly and concisely."},
                {"role": "user", "content": prompt}
            ],
            "temperature": 0.3,
            "max_tokens": 500,
        }
        
        resp = _snowflake.send_snow_api_request(
            "POST",
            COMPLETE_API_ENDPOINT,
            {}, {}, payload, {}, 60000
        )
        
        status = resp.get("status")
        if status and int(status) != 200:
            return f"Summary generation error: {status}"
        
        body = resp.get("content", "")
        if isinstance(body, (bytes, bytearray)):
            body = body.decode("utf-8", errors="replace")
        
        # Parse response (may be SSE or JSON)
        full_content = ""
        try:
            # Try parsing as JSON array (SSE events)
            events = json.loads(body)
            if isinstance(events, list):
                for event in events:
                    if "choices" in event and event["choices"]:
                        delta = event["choices"][0].get("delta", {})
                        full_content += delta.get("content", "")
            else:
                # Single JSON response
                if "choices" in events and events["choices"]:
                    full_content = events["choices"][0].get("message", {}).get("content", "")
        except json.JSONDecodeError:
            full_content = body
        
        return full_content or "Unable to generate summary"
    
    @instrument(
        span_type=SpanAttributes.SpanType.RECORD_ROOT,
        attributes={
            SpanAttributes.RECORD_ROOT.INPUT: "query",
            SpanAttributes.RECORD_ROOT.OUTPUT: "return",
        }
    )
    def answer_query(self, query: str) -> str:
        """
        Main entry point - returns the generated SQL for comparison with ground truth.
        
        For SQL comparison evaluation:
        - INPUT: User's natural language question
        - OUTPUT: Generated SQL query (to compare with expected SQL)
        """
        # Reset conversation for clean comparison
        self.reset_conversation()
        
        # Call Cortex Analyst to get SQL
        analyst_result = self.call_cortex_analyst(query)
        
        # Extract and return the SQL
        sql = self._extract_sql(analyst_result["raw_response"])
        
        if sql:
            normalized_sql = self._normalize_sql(sql)
            return normalized_sql
        else:
            return self._extract_interpretation(analyst_result["raw_response"])
    
    def _normalize_sql(self, sql: str) -> str:
        """Normalize SQL for fair comparison."""
        # Remove the Generated by Cortex Analyst comment
        sql = re.sub(r'\s*--\s*Generated by Cortex Analyst.*', '', sql)
        sql = ' '.join(sql.split())
        sql = sql.strip().rstrip(';').strip()
        return sql
    
    def reset_conversation(self):
        """Reset conversation history for new session."""
        self.messages = []


print("CortexAnalystApp class defined")

## 3. Initialize App and Test

In [None]:
# Initialize the app
app = CortexAnalystApp(session=session, semantic_view=SEMANTIC_VIEW)
print(f"Cortex Analyst app initialized with semantic view: {SEMANTIC_VIEW}")

In [None]:
# Test single query (optional - update the question to match your semantic view)
app.reset_conversation()
test_response = app.answer_query("What is the total count?")
print(f"Test response: {test_response}")

## 4. TruLens Evaluation Setup

In [None]:
from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.apps.app import TruApp
from trulens.core.run import Run, RunConfig


def setup_evaluation(session, app: CortexAnalystApp):
    """
    Register app and create evaluation run for Snowflake AI Observability.
    This will appear under AI & ML -> Evaluations in Snowsight.
    """
    # Create TruLens connector
    connector = SnowflakeConnector(snowpark_session=session)
    tru_session = TruSession(connector=connector)
    
    # Register the app
    tru_app = TruApp(
        app,
        app_name="CORTEX_ANALYST_SEMANTIC_VIEW",
        app_version="v1.0",
        connector=connector,
        main_method=app.answer_query,
    )
    
    return tru_app, connector


print("setup_evaluation function defined")

In [None]:
# Register with TruLens
tru_app, connector = setup_evaluation(session, app)
print("App registered with TruLens")

## 5. Load Test Dataset from Snowflake

The test dataset table must have these columns:
- `INPUT_QUERY`: User questions (natural language)
- `OUTPUT_SQL`: Expected SQL queries (ground truth)

In [None]:
# Preview test dataset from Snowflake
print(f"Loading test dataset from: {TEST_DATASET_FQN}")
print("")

test_df = session.table(TEST_DATASET_FQN).to_pandas()
print(f"Loaded {len(test_df)} test cases")
print(f"Columns: {list(test_df.columns)}")
print("")
print("Preview:")
test_df.head()

In [None]:
# Validate required columns exist
required_columns = ["INPUT_QUERY", "OUTPUT_SQL"]
missing_columns = [col for col in required_columns if col not in test_df.columns]

if missing_columns:
    raise ValueError(f"Missing required columns in test dataset: {missing_columns}")
else:
    print("Test dataset has all required columns")

## 6. Run Evaluation

In [None]:
def create_evaluation_run(tru_app: TruApp, dataset_table: str, run_name: str = "semantic_view_eval"):
    """
    Create an evaluation run with test dataset from Snowflake.
    
    The dataset table should have columns:
    - INPUT_QUERY: User questions (natural language)
    - OUTPUT_SQL: Expected SQL queries (ground truth)
    """
    run_config = RunConfig(
        run_name=run_name,
        description="SQL comparison evaluation for Cortex Analyst",
        label="cortex_analyst_sql_eval",
        source_type="TABLE",
        dataset_name=dataset_table,
        dataset_spec={
            "RECORD_ROOT.INPUT": "INPUT_QUERY",
            "RECORD_ROOT.GROUND_TRUTH_OUTPUT": "OUTPUT_SQL",
        },
        llm_judge_name=JUDGE_MODEL,
    )
    
    run: Run = tru_app.add_run(run_config=run_config)
    return run


print("create_evaluation_run function defined")

In [None]:
# Create evaluation run
timestamp = time.strftime("%Y%m%d_%H%M%S")
run_name = f"sql_eval_{timestamp}"
run = create_evaluation_run(tru_app, dataset_table=TEST_DATASET_FQN, run_name=run_name)
print(f"Created evaluation run: {run_name}")
print(f"Using test dataset: {TEST_DATASET_FQN}")

In [None]:
# Start evaluation run
print("Starting evaluation run (this may take a few minutes)...")
run.start()

In [None]:
# Check invocation status
print("Checking invocation status...")
run.describe()

In [None]:
# Wait for invocation to complete
print("Waiting for invocation to complete...")
max_invocation_wait = 120  # 2 minutes
poll_interval = 10

for i in range(max_invocation_wait // poll_interval):
    time.sleep(poll_interval)
    print(f"\n--- Invocation status check {i+1} ---")
    try:
        run.describe()
    except Exception as e:
        print(f"Status check error: {e}")

## 7. Compute Metrics

In [None]:
# Compute evaluation metrics
print("Computing evaluation metrics...")
print("Requesting metrics: correctness, answer_relevance, coherence")
print("\nRequired attributes for these metrics:")
print("  - correctness: RECORD_ROOT.INPUT, RECORD_ROOT.OUTPUT, RECORD_ROOT.GROUND_TRUTH_OUTPUT")
print("  - answer_relevance: RECORD_ROOT.INPUT, RECORD_ROOT.OUTPUT")
print("  - coherence: RECORD_ROOT.OUTPUT")

try:
    result = run.compute_metrics([
        "correctness",         # Compares generated SQL to ground truth SQL
        "answer_relevance",    # Does the SQL address the question?
        "coherence",           # Is the output well-structured?
    ])
    print(f"\ncompute_metrics() returned: {result}")
except Exception as e:
    print(f"\nERROR calling compute_metrics(): {e}")
    import traceback
    traceback.print_exc()

In [None]:
# Wait for metrics computation to complete
print("Metrics are computed asynchronously by Snowflake.")
print("This can take 2-5 minutes depending on dataset size.")
print("\nPolling for completion...")

max_wait_minutes = 5
poll_interval_seconds = 15
max_polls = (max_wait_minutes * 60) // poll_interval_seconds

for i in range(max_polls):
    time.sleep(poll_interval_seconds)
    print(f"\n--- Metrics status check {i+1}/{max_polls} (waited {(i+1)*poll_interval_seconds}s) ---")
    try:
        run.describe()
    except Exception as e:
        print(f"Status check error: {e}")

## 8. View Results

In [None]:
print("=" * 60)
print("DONE!")
print("=" * 60)
print("\nView results in Snowsight:")
print("  AI & ML -> Evaluations -> CORTEX_ANALYST_SEMANTIC_VIEW")
print(f"\nRun name: {run_name}")
print("\nIf metrics are still not visible:")
print("  1. Refresh the Evaluations page in Snowsight")
print("  2. Click on the specific run to see metric details")
print("  3. Check that LLM judge model has access permissions")
print("\nFinal run status:")
run.describe()

## Summary

| Step | Description |
|------|-------------|
| 1. Setup | Configure semantic view and test dataset location |
| 2. App Class | CortexAnalystApp with TruLens instrumentation |
| 3. TruLens | Register app with SnowflakeConnector |
| 4. Test Data | Load from existing Snowflake table |
| 5. Run | Start evaluation run |
| 6. Metrics | Compute correctness, answer_relevance, coherence |
| 7. Results | View in Snowsight AI & ML -> Evaluations |