# üöÄ Cortex Analyst Interactive Tutorial
## Learn by Doing: Snowflake Cortex Analyst Service

**Author:** Li Ma  
**Date:** February 24, 2026  
**Project:** DIA v2.0 - Direct Marketing Analytics Intelligence

---

## üìö What You'll Learn

This interactive notebook teaches you how to:
1. ‚úÖ Connect to Snowflake with Python
2. ‚úÖ Use Cortex Analyst to convert natural language to SQL
3. ‚úÖ Execute queries and process results
4. ‚úÖ Build production-ready service wrappers
5. ‚úÖ Handle errors and log activities

## üéØ Prerequisites

- Docker containers running (`docker-compose up`)
- Snowflake credentials configured in `.env` file
- Semantic model deployed to Snowflake stage

---

**üí° Tip:** Run each cell with `Shift + Enter` and experiment with the code!

In [5]:
# Install required packages for this notebook
# Run this cell once to install dependencies
import sys
import subprocess

packages = [
    'structlog',
    'python-dotenv',
    'snowflake-snowpark-python'
]

print("üì¶ Installing required packages...")
for package in packages:
    print(f"   Installing {package}...")
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
        print(f"   ‚úÖ {package} installed")
    except subprocess.CalledProcessError as e:
        print(f"   ‚ùå Failed to install {package}: {e}")

print("\n‚úÖ Installation complete!")
print("‚ö†Ô∏è  If this is the first install, please RESTART THE KERNEL:")
print("   Jupyter menu: Kernel ‚Üí Restart Kernel")

üì¶ Installing required packages...
   Installing structlog...
   ‚úÖ structlog installed
   Installing python-dotenv...
   ‚úÖ python-dotenv installed
   Installing snowflake-snowpark-python...
   ‚úÖ snowflake-snowpark-python installed

‚úÖ Installation complete!
‚ö†Ô∏è  If this is the first install, please RESTART THE KERNEL:
   Jupyter menu: Kernel ‚Üí Restart Kernel


## üì¶ Step 0: Install Dependencies (Run Once)

**Important:** Run this cell first to install required packages in your Jupyter environment.

In [20]:
# Add the parent directory to path so we can import modules
import sys
sys.path.insert(0, '/app')  # For Docker environment

# Core Python libraries
import os
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass

# Snowflake libraries
from snowflake.snowpark import Session

# Environment and logging
from dotenv import load_dotenv
from utils.logging import get_logger

# Load environment variables from .env file
load_dotenv()

# Initialize logger
logger = get_logger(__name__)

print("‚úÖ All libraries imported successfully!")
print(f"   Python version: {sys.version}")

‚úÖ All libraries imported successfully!
   Python version: 3.12.10 (tags/v3.12.10:0cc8128, Apr  8 2025, 12:21:36) [MSC v.1943 64 bit (AMD64)]


In [19]:
# Define the AnalystResponse dataclass
@dataclass
class AnalystResponse:
    """
    Structured response from Cortex Analyst.
    
    
    """


In [10]:
@dataclass
class AnalystResponse:
    """
    Container for Cortex Analyst responses.
    
    Attributes:
        query (str): The natural language question
        sql (str): Generated SQL query
        results (List[Dict]): Query results as list of dictionaries
        metadata (Dict): Additional information (row count, execution time)
        error (str): Error message if something went wrong
    """
    query: str
    sql: Optional[str] = None
    results: Optional[List[Dict[str, Any]]] = None
    metadata: Optional[Dict[str, Any]] = None
    error: Optional[str] = None

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary (useful for JSON APIs)"""
        return {
            "query": self.query,
            "sql": self.sql,
            "results": self.results,
            "metadata": self.metadata,
            "error": self.error
        }
    
# Test it out!
sample_reponse = AnalystResponse(
    query="What is the average open rate?",
    sql="SELECT AVG(open_rate) FROM email_campaigns",
    results=[{"AVG(open_rate)": 22.5}],
    metadata={"row_count":1}
)

print("‚úÖ AnalystResponse dataclass defined successfully!")
print(f"   Query: {sample_reponse.query}")
print(f"   SQL: {sample_reponse.sql}")
print(f"   Results: {sample_reponse.results}")
print(f"   Metadata: {sample_reponse.metadata}")
print(f"   Error: {sample_reponse.error}")


‚úÖ AnalystResponse dataclass defined successfully!
   Query: What is the average open rate?
   SQL: SELECT AVG(open_rate) FROM email_campaigns
   Results: [{'AVG(open_rate)': 22.5}]
   Metadata: {'row_count': 1}
   Error: None


In [18]:
# MAIN SERVICE CLASS FOR CORTEX ANALYST
class CortexAnalyst:
    """
    Python wrapper for Snowflake Cortex Analyst service.

    This class handles all interactions with Cortex Analyst:
    - Connecting to Snowflake
    - Sending natural language questions
    - Receiving SQL and results
    - Error handling and logging

    Usage Example:
      # Create an instance
      analyst = CortexAnalyst()

      # Ask a question
      response = analyst.send_message("What was the average open rate last week?")
    """
    
    def __init__(
        self,
        semantic_model_file: str = "semnatic.yaml",
        stage_name: str = "SEMANTIC_MODELS"
    ):
        """
        Initialize the CortexAnalyst instance.

        Args:
            semantic_model_file (str): Name of the semantic model file in Snowflake stage.
            stage_name (str): Name of the Snowflake stage where the semantic model is stored.

        What happens here:
        1. Load configuration from environment variables (.env file)
        2. Create a connection session to Snowflake
        3. Prepares the path to the semantic model
        """

        # Store configuration
        self.semantic_model_file = semantic_model_file
        self.stage_name = stage_name

        # Get Snowflake credentials from environment variables
        # These were set in the .env file and loaded by load_dotenv()
        self.account = os.getenv("SNOWFLAKE_ACCOUNT")
        self.user = os.getenv("SNOWFLAKE_USER")
        self.password = os.getenv("SNOWFLAKE_PASSWORD")
        self.database = os.getenv("SNOWFLAKE_DATABASE")
        self.schema = os.getenv("SNOWFLAKE_SCHEMA")
        self.warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
        self.role = os.getenv("SNOWFLAKE_ROLE")

        # Initialize Snowflake session (connection will be creaed on demand)
        self.session: Optional[Session] = None

        # Log the initialization
        logger.info("CortexAnalyst initialized'",
                    database = self.database,
                    schema = self.schema,
                    semantic_model_file = f"@{stage_name}/{semantic_model_file}"
        )

    # CONNECTION MANAGEMENT
    def _get_session(self) -> Session:
        """
        Get or create a Snowflake session (lazy loading pattern).
        
        Why "lazy loading"?
        Instead of connecting immediately when we create the CortexAnalyst object,
        we wait until we actually need the connection. This saves resources!
        
        Returns:
            Session: Active Snowflake session
        
        Raises:
            Exception: If connection fails
        """
        # If we alreay have a session, reuse it. Otherwise, create a new one.
        if self.session is None:
            return self._session
        
        try:
            #create connection parmeters dictionary
            connection_params = {
                "account": self.account,
                "user": self.user,
                "password": self.password,  
                "database": self.database,
                "schema": self.schema,
                "warehouse": self.warehouse,
                "role": self.role
            }

            # create the session (connect to Snowflake)
            logger.info("Creating new Snowflake session...")
            self._session = Session.builder.configs(connection_params).create()

            logger.info(
                "Snowflake session created successfully!",
                account=self.account,
                database=self.database
            )

            return self._session
        
        except Exception as e:
            logger.error(f"Failed to create Snowflake session: {e}")
            raise Exception(f"Snowflake connection error: {e}")
        
    def close(self):
        """
        Close the Snowflake session if it exists.
        
        Always call this when you're done to free up resources. 
        Good practice: Use "with" statement or try/finally block.

        Example:
            analyst = CortexAnalyst()
            try:
                response = analyst.send_message("What is the total sales?")
            finally:
                analyst.close()   # Always cleanup!
        """

        if self._session:
            self._session.close()
            self._session = None
            logger.info("Snowflake session closed.")


    def send_message(
        self,
        query: str,
        conversation_id: Optional[str] = None
    ) -> AnalystResponse:
        """
        Send a natural language question to Cortex Analyst.
        
        This is the main method you'll use! It:
        1. Takes your natural language question
        2. Sends it to Cortex Analyst
        3. Gets back SQL and executes it
        4. Returns the results in a nice format
        
        Args:
            query (str): Your natural language question
                Examples:
                - "What was the average open rate last month?"
                - "Show me click rates by market"
                - "Which campaigns had bounce rate above 5%?"
            
            conversation_id (Optional[str]): ID for multi-turn conversations
                (useful for follow-up questions - Cortex remembers context!)
        
        Returns:
            AnalystResponse: Contains SQL, results, and metadata
        
        Example:
            analyst = CortexAnalyst()
            
            # Ask a question
            response = analyst.send_message(
                "What was total emails sent in January 2026?"
            )
            
            # Check if successful
            if response.error:
                print(f"Error: {response.error}")
            else:
                print(f"SQL Generated: {response.sql}")
                print(f"Results: {response.results}")
        """
        # Log the incoming query
        logger.info("Received query", query=query, conversation_id=conversation_id)

        try:
            # Get Snowflake session
            session = self._get_session()

            # Build the semantic model reference
            # Format: @DATABASE.SCHEMA.STAGE_NAME/file.yaml
            semantic_model_ref = (
                f"@{self.database}.{self.schema}.{self.stage_name}/"
                f"{self.semantic_model_file}"
            )

            # CORTEX ANALYST API CALL
            # This is where we interact with the Cortex Analyst service.
            # 
            # SNOWFLAKE.CORTEX.ANALYST() function:
            # - First parameter: Your question (natural language)
            # - Second parameter: Path to semantic model
            # - Third parameter (optional): Conversation ID for context

            if conversation_id:
                # Multi-turn conversation (remembers previous interactions)
                                # Multi-turn conversation (remembers previous questions)
                sql_query = f"""
                    SELECT SNOWFLAKE.CORTEX.ANALYST(
                        '{self._escape_quotes(query)}',
                        '{semantic_model_ref}',
                        '{conversation_id}'
                    ) AS response
                """
            else:
                # Single-turn question (no context)
                sql_query = f"""
                    SELECT SNOWFLAKE.CORTEX.ANALYST(
                        '{self._escape_quotes(query)}',
                        '{semantic_model_ref}'
                    ) AS response
                """
            logger.debug("Executing Cortex Analyst query", sql=sql_query)

            # Execute the query
            result = session.sql(sql_query).collect()

            # Parse the response from Cortex Analyst
            # The result comes back as JSON, so we need to parse it
            if result and len(result) > 0:
                response_json = result[0]['RESPONSE']

                # Convert from JSON string to Python dict
                if isinstance(response_json, str):
                    response_data = json.loads(response_json)
                else:
                    response_data = response_json  # Already a dict
                
                logger.debug("Parsed Cortex Analyst response", response=response_data)

                # Extract the components from the response
                # Cortex Analyst returns: SQL query, results, and metadata
                generated_sql = response_data.get("sql", None)
                query_results = response_data.get("results", [])
                metadata =  response_data.get("metadata", {})
                
                # If SQL was generated, we can execute it to get fresh results
                # (or use the results that Cortex already executed for us)
                if generated_sql and not query_results:
                    logger.info("Executing generated SQL", sql=generated_sql)
                    query_results = self._execute_sql(generated_sql)
                
                # Create successful response
                response = AnalystResponse(
                    query=query,
                    sql=generated_sql,
                    results=query_results,
                    metadata=metadata or {"row_count": len(query_results)}
                )

                logger.info(
                    "Query processed successfully",
                    query=query,
                    row_count=len(query_results) if query_results else 0
                )
                return response
            
            else:
                # No results returned - something went wrong
                error_msg = "No results returned from Cortex Analyst."
                logger.warning(error_msg, query=query)
                
                return AnalystResponse(
                    query=query,
                    error=error_msg
                )

        except Exception as e:
            # Handle any errors that occurred
            error_msg = f"Error processing query: {str(e)}"
            logger.error(error_msg, query=query, exception=str(e))

            return AnalystResponse(
                query=query,
                error=error_msg
            )

# HELPER METHODS

    def _escape_quotes(self, text: str) -> str:
        """
        Escape single quotes in text for SQL injection safety.
        
        Why is this important?
        If a user asks: "Show me John's campaigns"
        We need to escape the apostrophe so SQL doesn't break:
        "Show me John''s campaigns" (double quote in SQL)
        
        Args:
            text (str): Text that might contain quotes
        
        Returns:
            str: Text with quotes properly escaped
        """
        return text.replace("'", "''")
    
    def _execute_sql(self, sql: str) -> List[Dict[str, Any]]:
        """
        Execute a SQL query and return results as list of dictionaries.
        
        This is useful when you have a SQL query and want to run it
        against your Snowflake database.
        
        Args:
            sql (str): The SQL query to execute
        
        Returns:
            List[Dict[str, Any]]: Results as list of row dictionaries
        
        Example Result:
            [
                {"MARKET": "UK", "OPEN_RATE": 22.5},
                {"MARKET": "Germany", "OPEN_RATE": 19.8}
            ]
        """
        try:
            session = self._get_session()
            # Execute the query
            result = session.sql(sql).collect()

            # Convert Snowflake Row objects to dictionaries
            results_as_dicts = [row.asDict() for row in result]

            logger.debug("SQL executed successfully", sql=sql, row_count=len(results_as_dicts))
            return results_as_dicts
        
        except Exception as e:
            logger.error(f"Error executing SQL: {e}", sql=sql)
            raise Exception(f"SQL execution error: {e}")
    
    def verify_semantic_model(self) -> Dict[str, Any]:
        """
        Verify that the semantic model file exists in the Snowflake stage.
        
        This is a useful diagnostic method to check your setup!
        Call this if you're having issues to verify everything is configured correctly.
        
        Returns:
            Dict[str, Any]: Information about the semantic model file existence and details
        
        Returns:
            Dict with verification results:
            {
                "exists": True/False,
                "file_name": "semantic.yaml",
                "stage_path": "@DATABASE.SCHEMA.STAGE",
                "file_size": 12345,
                "last_modified": "2026-02-22..."
            }
        
        Example:
            analyst = CortexAnalyst()
            verification = analyst.verify_semantic_model()
        
            if verification["exists"]:
                print("Semantic model file found!")      
            else:
                print("Semantic model file NOT found. Please check your stage and file name.")
        """

        try:
            session = self._get_session()

            # List files in the stage
            stage_path = f"@{self.database}.{self.schema}.{self.stage_name}"
            list_path = f"{stage_path}"

            # Check if the file exists in the stage
            sql_check = f"LIST {file_path}"
            result = session.sql(sql_check).collect()

            logger.info("Semantic model file found", file_info=file_info)
            
            result = session.sql(list_sql).collect()

            # Look for our semantic model file
            for row in result:
                file_name = row['name']
                if self.semantic_model_file in file_name:
                    verification = {
                        "exists": True,
                        "file_name": row['name'],
                        "stage_path": stage_path,
                        "file_size": row['size'],
                        "last_modified": row['last_modified']
                    }
                    logger.info("Semantic model file found", file_info=file_info)
                    return verification
            
            #File not found
            logger.warning(
                "Semantic model file NOT found in stage", 
                stage_path=stage_path, 
                expected_file=self.semantic_model_file
            )
            
            return {
                "exists": False,
                "file_name": self.semantic_model_file,
                "stage_path": stage_path,
                "file_size": None,
                "last_modified": None
            }
        
        except Exception as e:
            logger.error(f"Failed to verify semantic model: {e}")
            return {
                "exists": False,
                "error": str(e)
            }
    
    # CONTEXT MANAGER SUPPORT (FOR "WITH" STATEMENT)
    def __enter__(self):
    """Enable use with 'with' statement (context manager)"""
    return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Automatically close connection when exiting 'with' block"""
        self.close()


SyntaxError: incomplete input (2769104461.py, line 217)

In [None]:
# Import the CortexAnalyst service
from services.cortex_analyst import CortexAnalyst

print("‚úÖ CortexAnalyst class imported successfully!")
print("   You can now create instances: analyst = CortexAnalyst()")

## üéì Summary: What You Learned

Congratulations! You've learned:

‚úÖ **Python OOP Concepts**
- Classes and objects  
- Instance methods and attributes
- Context managers (`with` statement)
- Type hints and dataclasses

‚úÖ **Snowflake Integration**
- Connecting with Snowpark
- Executing SQL queries
- Handling results

‚úÖ **Service Design Patterns**
- Lazy loading (efficient resource usage)
- Error handling and logging
- Structured responses

‚úÖ **Production Best Practices**
- Environment-based configuration
- Comprehensive logging
- Clean code with documentation

---

## üöÄ Next Steps

1. **Practice More**: Try different questions in the exercise cell above  
2. **Build Other Services**: Apply this pattern to `cortex_complete.py`, `cortex_search.py`
3. **Enhance Features**: Add caching, retry logic, rate limiting
4. **Integration**: Use in your FastAPI endpoints
5. **Testing**: Write pytest tests for edge cases

---

## üìö Resources

- [Snowflake Cortex Documentation](https://docs.snowflake.com/en/user-guide/snowflake-cortex)
- [CORTEX_ANALYST_LEARNING_GUIDE.md](../orchestrator/services/CORTEX_ANALYST_LEARNING_GUIDE.md)
- [Python Dataclasses](https://docs.python.org/3/library/dataclasses.html)
- [Structlog](https://www.structlog.org/)

---

**Happy Coding! üéâ**

## üéØ Practice Exercise: Ask Your Own Question

**Your Turn!** Try asking different questions about your email data.

**Example Questions:**
- "What was the total emails sent last month?"
- "Show me click rate by market"
- "Which campaigns had bounce rate above 5%?"
- "What is the average open rate by business unit?"

**Instructions:**
1. Change the `my_question` variable below
2. Run the cell
3. See if Cortex Analyst can answer it!

In [None]:
# üèãÔ∏è Exercise: Write your own question!

with CortexAnalyst() as analyst:
    # TODO: Change this question to something you want to know!
    my_question = "CHANGE THIS TO YOUR QUESTION"
    
    response = analyst.send_message(my_question)
    
    if response.error:
        print(f"‚ùå Error: {response.error}")
    else:
        print(f"‚úÖ Question: {response.query}")
        print(f"\nüìä SQL: {response.sql}")
        print(f"\nüìà Results:")
        for i, row in enumerate(response.results[:10], 1):
            print(f"   {i}. {row}")

## üß™ Test 3: Try Cortex Analyst (Natural Language Query)

Now let's try asking a question in natural language!

**Note:** This requires Cortex Analyst to be enabled in your Snowflake account. If not enabled yet, you'll see an error message (that's expected!).

In [None]:
with CortexAnalyst() as analyst:
    # Ask a question in natural language
    question = "What is the average open rate?"
    
    print(f"ü§î Asking: '{question}'")
    print("   Processing...")
    
    response = analyst.send_message(question)
    
    if response.error:
        print(f"\n‚ö†Ô∏è  Query Failed (Expected if Cortex Analyst not enabled)")
        print(f"   Error: {response.error}")
        print("\nüí° To enable Cortex Analyst:")
        print("   1. Contact Snowflake support or your account admin")
        print("   2. Request 'Cortex Analyst' feature activation")
    else:
        print("\n‚úÖ Query Successful!")
        print(f"\nüìù Question: {response.query}")
        print(f"\nüîç Generated SQL:")
        print(f"   {response.sql}")
        print(f"\nüìä Results:")
        for i, row in enumerate(response.results[:5], 1):  # First 5 rows
            print(f"   {i}. {row}")
        print(f"\n‚ÑπÔ∏è  Metadata: {response.metadata}")

## üß™ Test 2: Execute Simple SQL Query

Before trying Cortex Analyst, let's test basic SQL execution against your data.

In [None]:
with CortexAnalyst() as analyst:
    try:
        # Simple count query
        sql = "SELECT COUNT(*) AS ROW_COUNT FROM VW_SFMC_EMAIL_PERFORMANCE LIMIT 1"
        results = analyst._execute_sql(sql)
        
        print("‚úÖ SQL Execution Test Passed!")
        print(f"   Rows in VW_SFMC_EMAIL_PERFORMANCE: {results[0]['ROW_COUNT']:,}")
        
        # Get sample data
        sample_sql = "SELECT MARKET, OPEN_RATE, CLICK_RATE FROM VW_SFMC_EMAIL_PERFORMANCE LIMIT 5"
        sample_data = analyst._execute_sql(sample_sql)
        
        print("\nüìä Sample Data:")
        for i, row in enumerate(sample_data, 1):
            print(f"   {i}. Market: {row['MARKET']}, Open Rate: {row['OPEN_RATE']}%, Click Rate: {row['CLICK_RATE']}%")
        
    except Exception as e:
        print(f"‚ùå SQL Execution Failed: {e}")

## üß™ Test 1: Verify Semantic Model

Let's verify your semantic model is deployed correctly in Snowflake.

In [None]:
# Create analyst instance
with CortexAnalyst() as analyst:
    is_valid = analyst.verify_semantic_model()
    
    if is_valid:
        print("‚úÖ Semantic Model Found!")
        print("   Your semantic model is deployed and ready to use.")
    else:
        print("‚ùå Semantic Model Not Found")
        print("   Please deploy your semantic model first.")

## 3Ô∏è‚É£ CortexAnalyst Class: Complete Implementation

Now let's import the complete CortexAnalyst class from our service module.

This class includes:
- Snowflake connection management
- Cortex Analyst API calls
- Error handling and logging
- Helper methods

## 2Ô∏è‚É£ Data Models: AnalystResponse

Before we build the service, let's create a data structure to hold responses from Cortex Analyst.

**Why dataclasses?**
- Clean, readable code
- Type hints for better error checking
- Built-in methods like `__repr__`
- Less boilerplate than regular classes

## 1Ô∏è‚É£ Import Required Libraries

First, we need to import all the Python libraries we'll use:
- **snowflake.snowpark**: For connecting to Snowflake
- **dotenv**: For loading environment variables
- **dataclasses**: For creating data structures
- **json**: For parsing JSON responses