# Automated Least-Permissions Role Generation for Snowflake Cortex Agents

This notebook automates the process of generating least-privilege SQL scripts for Snowflake Cortex Agents by:

1. **REST API Call**: Making REST API calls to DESCRIBE agent objects and extracting relevant analyst + search tools + agent database.schema information
2. **SQL Query Generation**: From the curl output, generating SQL queries for individual semantic views from the agent object to identify tables that need permissions
3. **Permission Script Output**: Returning comprehensive SQL output for permissions that an admin can execute

## Prerequisites
- Snowflake connection configured
- Appropriate permissions to query agent objects and semantic views
- Python packages: requests, pyyaml, snowflake-snowpark-python


In [None]:
# Import required libraries
import json
import yaml
import requests
from datetime import datetime
from typing import Dict, List, Set, Tuple, Optional
import os
import tempfile
from dotenv import load_dotenv

# Snowflake imports
import snowflake.connector
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import col, lit
from snowflake.snowpark.types import StringType

# Load environment variables
load_dotenv()

print("Libraries imported successfully!")


## Step 1: REST API Call to DESCRIBE Agent Object

This section handles making REST API calls to Snowflake to retrieve agent specifications and extract relevant information about analyst tools, search tools, and agent database/schema details.


In [None]:
def make_agent_describe_api_call(account_url: str, agent_database: str, agent_schema: str, agent_name: str, bearer_token: str) -> Optional[Dict]:
    """
    Make REST API call to DESCRIBE agent object in Snowflake.
    
    Args:
        account_url: Snowflake account URL (e.g., 'https://orgname-accountname.snowflakecomputing.com')
        agent_database: Database containing the agent
        agent_schema: Schema containing the agent
        agent_name: Name of the agent
        bearer_token: Bearer token for authentication
    
    Returns:
        Dictionary containing agent specification or None if failed
    """
    try:
        # Construct the API endpoint URL
        api_url = f"{account_url}/api/v2/databases/{agent_database.lower()}/schemas/{agent_schema.lower()}/agents/{agent_name}"
        
        # Set up headers
        headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'Authorization': f'Bearer {bearer_token}'
        }
        
        # Make the API call
        response = requests.get(api_url, headers=headers)
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"API call failed with status code: {response.status_code}")
            print(f"Response: {response.text}")
            return None
            
    except Exception as e:
        print(f"Error making API call: {e}")
        return None


### Using the REST API Function (Real Implementation)

To use the `make_agent_describe_api_call()` function with actual Snowflake REST API calls, you'll need:

1. **Snowflake Account URL**: Your Snowflake account URL
2. **Bearer Token**: A valid authentication token
3. **Agent Details**: Database, schema, and agent name

Here's how to use it:


In [None]:
# Example: Using the REST API function with real Snowflake API calls
# Uncomment and modify the following code to make actual API calls

def make_real_api_call_example(SNOWFLAKE_ACCOUNT_URL, AGENT_DATABASE, AGENT_SCHEMA, AGENT_NAME):
    """
    Example function showing how to make a real API call.
    Modify the parameters below for your environment.
    """
    
    # Configuration - MODIFY THESE VALUES FOR YOUR ENVIRONMENT
    # SNOWFLAKE_ACCOUNT_URL = "https://orgname-accountname.snowflakecomputing.com"
    # AGENT_DATABASE = "yourDB"
    # AGENT_SCHEMA = "yourSchema"
    # AGENT_NAME = "agentName"
    
    bearer_token = os.getenv("SNOWFLAKE_BEARER_TOKEN")
    
    if not bearer_token:
        print("No bearer token available. Please set SNOWFLAKE_BEARER_TOKEN environment variable")
        print("or ensure you have an active Snowflake session.")
        return None
    
    # Make the API call
    print(f"Making API call for agent: {AGENT_DATABASE}.{AGENT_SCHEMA}.{AGENT_NAME}")
    agent_data = make_agent_describe_api_call(
        account_url=SNOWFLAKE_ACCOUNT_URL,
        agent_database=AGENT_DATABASE,
        agent_schema=AGENT_SCHEMA,
        agent_name=AGENT_NAME,
        bearer_token=bearer_token
    )
    
    if agent_data:
        print("✅ API call successful!")
        print(f"Agent Name: {agent_data.get('name')}")
        print(f"Database: {agent_data.get('database_name')}")
        print(f"Schema: {agent_data.get('schema_name')}")
        return agent_data
    else:
        print("❌ API call failed")
        return None



In [None]:
#Modify the inputs to the function call to point at the agent object you want to generate permissions for

example_agent_data = make_real_api_call_example(
    "https://orgname-accountname.snowflakecomputing.com",
    "yourDB",
    "yourSchema",
    "agentName")
example_agent_data

## Step 2: Parse Agent Specification and Extract Tool Information

This section parses the agent specification JSON to extract information about:
- Cortex Analyst (Text-to-SQL) tools and their semantic views
- Cortex Search tools and their search services
- Database and schema usage requirements


In [None]:
def parse_agent_tools(agent_data: Dict) -> Dict:
    """
    Parse agent specification to extract tool information.
    
    Args:
        agent_data: Agent specification dictionary from REST API
    
    Returns:
        Dictionary containing parsed tool information
    """
    try:
        # Parse the nested agent_spec JSON string
        agent_spec = json.loads(agent_data["agent_spec"])
        tools = agent_spec.get("tools", [])
        tool_resources = agent_spec.get("tool_resources", {})
        
        # Initialize collections
        semantic_views = set()
        semantic_model_files = set()  # Track semantic model files
        search_services = set()
        procedures = set()  # Track procedures
        databases = set()
        schemas = set()
        tool_details = []
        tool_warehouses = {}  # Track warehouses for specific tools
        
        # Process each tool
        for tool in tools:
            tool_spec = tool.get("tool_spec", {})
            tool_type = tool_spec.get("type")
            tool_name = tool_spec.get("name")
            tool_description = tool_spec.get("description", "")
            
            if not tool_name or tool_name not in tool_resources:
                continue
                
            resource_details = tool_resources[tool_name]
            
            tool_info = {
                "name": tool_name,
                "type": tool_type,
                "description": tool_description,
                "resources": resource_details
            }
            
            # Extract warehouse information from execution_environment
            execution_env = resource_details.get("execution_environment", {})
            if execution_env and execution_env.get("type") == "warehouse":
                warehouse_name = execution_env.get("warehouse", "")
                if warehouse_name:  # Only track non-empty warehouses
                    tool_warehouses[tool_name] = warehouse_name
                    tool_info["warehouse"] = warehouse_name
            
            if tool_type == "cortex_analyst_text_to_sql":
                semantic_view = resource_details.get("semantic_view")
                semantic_model_file = resource_details.get("semantic_model_file")
                
                if semantic_view:
                    semantic_views.add(semantic_view)
                    tool_info["semantic_view"] = semantic_view
                    
                    # Extract database and schema from semantic view
                    parts = semantic_view.split('.')
                    if len(parts) == 3:
                        databases.add(parts[0])
                        schemas.add(f"{parts[0]}.{parts[1]}")
                
                if semantic_model_file:
                    semantic_model_files.add(semantic_model_file)
                    tool_info["semantic_model_file"] = semantic_model_file
                    
                    # Extract stage path from semantic model file
                    # Format: @DATABASE.SCHEMA.STAGE_NAME/file.yaml
                    if semantic_model_file.startswith('@'):
                        stage_path = semantic_model_file[1:]  # Remove @ prefix
                        path_parts = stage_path.split('/')
                        if len(path_parts) >= 1:
                            stage_identifier = path_parts[0]  # DATABASE.SCHEMA.STAGE_NAME
                            stage_parts = stage_identifier.split('.')
                            if len(stage_parts) >= 3:
                                databases.add(stage_parts[0])
                                schemas.add(f"{stage_parts[0]}.{stage_parts[1]}")
                                tool_info["stage"] = stage_identifier
                        
            elif tool_type == "cortex_search":
                # search_service = resource_details.get("name")
                search_service = resource_details.get("search_service") or resource_details.get("name")
                if search_service:
                    search_services.add(search_service)
                    tool_info["search_service"] = search_service
                    
                    # Extract database and schema from search service
                    parts = search_service.split('.')
                    if len(parts) == 3:
                        databases.add(parts[0])
                        schemas.add(f"{parts[0]}.{parts[1]}")
            
            elif tool_type == "generic":
                # Handle generic tools that might be procedures
                procedure_type = resource_details.get("type")
                if procedure_type == "procedure":
                    procedure_identifier = resource_details.get("identifier")
                    procedure_name = resource_details.get("name")
                    
                    if procedure_identifier and procedure_name:
                        # Combine identifier and name to get full procedure signature
                        # identifier: "db.schema.procedure"
                        # name: "procedure(VARCHAR, VARCHAR, VARCHAR)"
                        # result: "db.schema.procedure(VARCHAR, VARCHAR, VARCHAR)"
                        full_procedure_signature = f"{procedure_identifier}({procedure_name.split('(')[1]}"
                        procedures.add(full_procedure_signature)
                        tool_info["procedure"] = full_procedure_signature
                        
                        # Extract database and schema from procedure identifier
                        # Format: DATABASE.SCHEMA.PROCEDURE_NAME
                        parts = procedure_identifier.split('.')
                        if len(parts) >= 3:
                            databases.add(parts[0])
                            schemas.add(f"{parts[0]}.{parts[1]}")
                        print(f"Tool {tool_name} requires procedure: {full_procedure_signature}")
            
            tool_details.append(tool_info)
        
        return {
            "semantic_views": list(semantic_views),
            "semantic_model_files": list(semantic_model_files),  # Include semantic model files
            "search_services": list(search_services),
            "procedures": list(procedures),  # Include procedures
            "databases": list(databases),
            "schemas": list(schemas),
            "tool_details": tool_details,
            "tool_warehouses": tool_warehouses,  # Include warehouse mapping
            "agent_name": agent_data.get("name"),
            "agent_database": agent_data.get("database_name"),
            "agent_schema": agent_data.get("schema_name")
        }
        
    except Exception as e:
        print(f"Error parsing agent tools: {e}")
        return {
            "semantic_views": [],
            "semantic_model_files": [],  # Include semantic model files
            "search_services": [],
            "procedures": [],
            "databases": [],
            "schemas": [],
            "tool_details": [],
            "tool_warehouses": {},
            "agent_name": "UNKNOWN",
            "agent_database": "UNKNOWN",
            "agent_schema": "UNKNOWN"
        }

print("Agent parsing functions defined successfully!")


In [None]:
# Parse the example agent data
if example_agent_data:
    parsed_tools = parse_agent_tools(example_agent_data)
    
    print("Parsed Tool Information:")
    print(f"Agent: {parsed_tools['agent_name']}")
    print(f"Agent Location: {parsed_tools['agent_database']}.{parsed_tools['agent_schema']}")
    print(f"\nSemantic Views: {parsed_tools['semantic_views']}")
    print(f"Search Services: {parsed_tools['search_services']}")
    print(f"Databases: {parsed_tools['databases']}")
    print(f"Schemas: {parsed_tools['schemas']}")
    
    print("\nDetailed Tool Information:")
    for tool in parsed_tools['tool_details']:
        print(f"\nTool: {tool['name']}")
        print(f"  Type: {tool['type']}")
        print(f"  Description: {tool['description'][:100]}..." if len(tool['description']) > 100 else f"  Description: {tool['description']}")
        if 'semantic_view' in tool:
            print(f"  Semantic View: {tool['semantic_view']}")
        if 'search_service' in tool:
            print(f"  Search Service: {tool['search_service']}")
else:
    print("No agent data available to parse")


In [None]:
def generate_semantic_view_queries(semantic_views: List[str]) -> List[str]:
    """
    Generate SQL queries to extract YAML definitions from semantic views.
    
    Args:
        semantic_views: List of semantic view names
    
    Returns:
        List of SQL queries
    """
    queries = []
    
    for semantic_view in semantic_views:
        query = f"SELECT SYSTEM$READ_YAML_FROM_SEMANTIC_VIEW('{semantic_view}') as yaml_content, '{semantic_view}' as semantic_view_name;"
        queries.append(query)
    
    return queries

def extract_stage_info_from_semantic_model_file(semantic_model_file: str) -> Tuple[str, str, str]:
    """
    Extract stage information from semantic model file path.
    
    Args:
        semantic_model_file: Path like @DATABASE.SCHEMA.STAGE_NAME/file.yaml
    
    Returns:
        Tuple of (database, schema, stage_name)
    """
    if not semantic_model_file.startswith('@'):
        return None, None, None
    
    stage_path = semantic_model_file[1:]  # Remove @ prefix
    path_parts = stage_path.split('/')
    
    if len(path_parts) >= 1:
        stage_identifier = path_parts[0]  # DATABASE.SCHEMA.STAGE_NAME
        stage_parts = stage_identifier.split('.')
        
        if len(stage_parts) >= 3:
            return stage_parts[0], stage_parts[1], stage_parts[2]
    
    return None, None, None

def read_yaml_from_stage_connector(semantic_model_file: str) -> Optional[Dict]:
    """
    Read YAML content from a stage using snowflake.connector (your approach).
    
    Args:
        semantic_model_file: Path to semantic model file (e.g., @DATABASE.SCHEMA.STAGE/file.yaml)
    
    Returns:
        Parsed YAML content as dictionary, or None if failed
    """
    try:
        # Extract stage information
        database, schema, stage_name = extract_stage_info_from_semantic_model_file(semantic_model_file)
        
        if not all([database, schema, stage_name]):
            print(f"  ⚠️  Could not parse stage information from {semantic_model_file}")
            return None
        
        file_name = semantic_model_file.split('/')[-1]
        
        # Snowflake Connection Details
        SNOWFLAKE_CONFIG = {
            "user": os.getenv("SNOWFLAKE_USER"),
            "password": os.getenv("SNOWFLAKE_USER_PASSWORD"),
            "account": os.getenv("SNOWFLAKE_ACCOUNT"),
            "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE")
        }
        
        # Setup Temporary Download Location
        local_temp_dir = tempfile.mkdtemp()
        local_file_path = os.path.join(local_temp_dir, file_name)
        
        print(f"  📁 Temporary download location: {local_file_path}")
        
        yaml_data = None
        conn = None
        
        try:
            # Connect to Snowflake
            print(f"  🔌 Connecting to Snowflake...")
            conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
            
            # Format the GET command
            stage_name_full = f"@{database}.{schema}.{stage_name}"
            sql_get_command = f"GET {stage_name_full}/{file_name} file://{os.path.normpath(local_temp_dir)}"
            
            print(f"  📥 Executing: {sql_get_command}")
            conn.cursor().execute(sql_get_command)
            print(f"  ✅ File downloaded successfully.")
            
            # Read and Parse the local YAML file
            print(f"  🔍 Parsing YAML file...")
            with open(local_file_path, 'r') as f:
                yaml_data = yaml.safe_load(f)
            
            print(f"  ✅ YAML file parsed successfully!")
            return yaml_data
            
        except Exception as e:
            print(f"  ❌ Error during YAML processing: {e}")
            return None
            
        finally:
            # Clean Up
            if conn:
                conn.close()
                print(f"  🔌 Snowflake connection closed.")
                
            # Remove the temporary file and directory
            try:
                if os.path.exists(local_file_path):
                    os.remove(local_file_path)
                if os.path.exists(local_temp_dir):
                    os.rmdir(local_temp_dir)
                print(f"  🧹 Temporary files cleaned up.")
            except Exception as e:
                print(f"  ⚠️  Error during cleanup: {e}")
                
    except Exception as e:
        print(f"  ❌ Error reading YAML from stage: {e}")
        return None

def extract_table_permissions_from_yaml(yaml_content: Dict) -> List[Tuple[str, str, str]]:
    """
    Extract table permissions from parsed YAML content.
    Enhanced to handle different YAML formats for semantic models.
    
    Args:
        yaml_content: Parsed YAML content from semantic view or semantic model file
    
    Returns:
        List of tuples containing (database, schema, table) for each table that needs SELECT permission
    """
    table_permissions = []
    
    if not yaml_content:
        return table_permissions
    
    # Method 1: Standard semantic view format
    if "tables" in yaml_content:
        for table in yaml_content["tables"]:
            if "base_table" in table:
                base_table = table["base_table"]
                database = base_table.get("database")
                schema = base_table.get("schema")
                table_name = base_table.get("table")
                
                if database and schema and table_name:
                    table_permissions.append((database, schema, table_name))
    
    # Method 2: Alternative semantic model format
    if "semantic_model" in yaml_content:
        semantic_model = yaml_content["semantic_model"]
        
        # Look for tables in different possible locations
        if "tables" in semantic_model:
            for table in semantic_model["tables"]:
                if isinstance(table, dict):
                    # Extract table information
                    database = table.get("database") or table.get("db")
                    schema = table.get("schema") or table.get("schema_name")
                    table_name = table.get("table") or table.get("table_name") or table.get("name")
                    
                    if database and schema and table_name:
                        table_permissions.append((database, schema, table_name))
    
    # Method 3: Look for table references in any nested structure
    def find_table_references(obj, path=""):
        """Recursively find table references in YAML structure"""
        if isinstance(obj, dict):
            for key, value in obj.items():
                if key.lower() in ['table', 'base_table', 'source_table'] and isinstance(value, dict):
                    database = value.get("database") or value.get("db")
                    schema = value.get("schema") or value.get("schema_name")
                    table_name = value.get("table") or value.get("table_name") or value.get("name")
                    
                    if database and schema and table_name:
                        table_permissions.append((database, schema, table_name))
                elif isinstance(value, (dict, list)):
                    find_table_references(value, f"{path}.{key}")
        elif isinstance(obj, list):
            for i, item in enumerate(obj):
                if isinstance(item, (dict, list)):
                    find_table_references(item, f"{path}[{i}]")
    
    # Run the recursive search
    find_table_references(yaml_content)
    
    # Remove duplicates while preserving order
    seen = set()
    unique_permissions = []
    for perm in table_permissions:
        if perm not in seen:
            seen.add(perm)
            unique_permissions.append(perm)
    
    return unique_permissions

print("Semantic view and semantic model file query functions defined successfully!")


In [None]:
# Generate SQL queries for semantic views
if example_agent_data:
    parsed_tools = parse_agent_tools(example_agent_data)
    semantic_view_queries = generate_semantic_view_queries(parsed_tools['semantic_views'])
    
    print("Generated SQL Queries for Semantic Views:")
    for i, query in enumerate(semantic_view_queries, 1):
        print(f"\nQuery {i}:")
        print(query)
else:
    print("No agent data available to generate queries")


In [None]:
def setup_snowflake_connection() -> Optional[Session]:
    """
    Set up Snowflake connection using environment variables.
    
    Returns:
        Snowpark session or None if connection failed
    """
    try:
        connection_params = {
            "account": os.getenv("SNOWFLAKE_ACCOUNT"),
            "user": os.getenv("SNOWFLAKE_USER"),
            "password": os.getenv("SNOWFLAKE_USER_PASSWORD"),
            "role": os.getenv("SNOWFLAKE_ROLE"),
            "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE")
        }
        
        # Validate required parameters
        required_params = ["account", "user", "password", "role", "warehouse"]
        missing_params = [param for param in required_params if not connection_params[param]]
        
        if missing_params:
            print(f"Missing required environment variables: {missing_params}")
            return None
        
        session = Session.builder.configs(connection_params).create()
        print("Successfully connected to Snowflake!")
        return session
        
    except Exception as e:
        print(f"Error connecting to Snowflake: {e}")
        return None

def execute_semantic_view_queries(session: Session, semantic_views: List[str]) -> Dict[str, List[Tuple[str, str, str]]]:
    """
    Execute semantic view queries and extract table permissions.
    
    Args:
        session: Snowpark session
        semantic_views: List of semantic view names
    
    Returns:
        Dictionary mapping semantic view names to their table permissions
    """
    results = {}
    
    for semantic_view in semantic_views:
        try:
            print(f"Processing semantic view: {semantic_view}")
            
            # Execute the query
            query = f"SELECT SYSTEM$READ_YAML_FROM_SEMANTIC_VIEW('{semantic_view}') as yaml_content"
            result = session.sql(query).collect()
            
            if result and result[0]['YAML_CONTENT']:
                # Parse YAML content
                yaml_content = yaml.safe_load(result[0]['YAML_CONTENT'])
                
                # Extract table permissions
                table_permissions = extract_table_permissions_from_yaml(yaml_content)
                results[semantic_view] = table_permissions
                
                print(f"  Found {len(table_permissions)} tables: {[f'{db}.{schema}.{table}' for db, schema, table in table_permissions]}")
                
            else:
                print(f"  No YAML content found for {semantic_view}")
                results[semantic_view] = []
                
        except Exception as e:
            print(f"  Error processing {semantic_view}: {e}")
            results[semantic_view] = []
    
    return results

def execute_semantic_model_file_queries(semantic_model_files: List[str]) -> Dict[str, List[Tuple[str, str, str]]]:
    """
    Execute semantic model file queries and extract table permissions using snowflake.connector.
    
    Args:
        semantic_model_files: List of semantic model file paths
    
    Returns:
        Dictionary mapping semantic model file names to their table permissions
    """
    results = {}
    
    for semantic_model_file in semantic_model_files:
        try:
            print(f"Processing semantic model file: {semantic_model_file}")
            
            # Use the connector-based YAML reading method
            yaml_content = read_yaml_from_stage_connector(semantic_model_file)
            
            if yaml_content:
                print(f"  ✅ Successfully read and parsed YAML file")
                
                # Extract table permissions
                table_permissions = extract_table_permissions_from_yaml(yaml_content)
                results[semantic_model_file] = table_permissions
                
                print(f"  ✅ Found {len(table_permissions)} tables: {[f'{db}.{schema}.{table}' for db, schema, table in table_permissions]}")
                
                # Print YAML structure for debugging
                print(f"  🔍 YAML structure keys: {list(yaml_content.keys())}")
                
            else:
                print(f"  ⚠️  Could not read YAML content from {semantic_model_file}")
                results[semantic_model_file] = []
                
        except Exception as e:
            print(f"  Error processing {semantic_model_file}: {e}")
            results[semantic_model_file] = []
    
    return results

print("Snowflake connection and execution functions defined successfully!")


In [None]:
# Set up Snowflake connection
session = setup_snowflake_connection()

# Execute semantic view queries if we have a connection and agent data
if session and example_agent_data:
    parsed_tools = parse_agent_tools(example_agent_data)
    
    # Process semantic views
    if parsed_tools['semantic_views']:
        print("\nExecuting semantic view queries...")
        table_permissions_results = execute_semantic_view_queries(session, parsed_tools['semantic_views'])
    else:
        print("No semantic views found in agent specification")
        table_permissions_results = {}
    
    # Process semantic model files
    if parsed_tools.get('semantic_model_files'):
        print("\nExecuting semantic model file queries...")
        semantic_model_results = execute_semantic_model_file_queries(parsed_tools['semantic_model_files'])
        # Merge results
        table_permissions_results.update(semantic_model_results)
    else:
        print("No semantic model files found in agent specification")
    
    print("\nTable Permissions Summary:")
    for resource_name, tables in table_permissions_results.items():
        print(f"\n{resource_name}:")
        for db, schema, table in tables:
            print(f"  - {db}.{schema}.{table}")
else:
    print("Skipping semantic view/model file execution - no connection or agent data available")
    table_permissions_results = {}


In [None]:
# Updated function that includes agent location database/schema USAGE permissions, tool-specific warehouse permissions, and stage permissions
def generate_comprehensive_permission_script_with_agent_location(
    parsed_tools: Dict,
    table_permissions_results: Dict[str, List[Tuple[str, str, str]]],
    user_password: Optional[str] = None,
    warehouse_name: str = "COMPUTE_WH"
) -> str:
    """
    Generate comprehensive SQL permission script including agent location permissions, tool-specific warehouse permissions, and stage permissions.
    
    Args:
        parsed_tools: Parsed tool information
        table_permissions_results: Results from semantic view queries
        user_password: Password for the created user
        warehouse_name: Warehouse name for the user
    
    Returns:
        Complete SQL script as string
    """
    agent_name = parsed_tools["agent_name"]
    agent_database = parsed_tools["agent_database"]
    agent_schema = parsed_tools["agent_schema"]
    fully_qualified_agent = f"{agent_database}.{agent_schema}.{agent_name}"
    
    # Collect all unique table permissions
    all_table_permissions = set()
    for tables in table_permissions_results.values():
        for db, schema, table in tables:
            all_table_permissions.add(f"{db}.{schema}.{table}")
    
    # Generate database and schema USAGE grants specifically for each semantic view
    semantic_view_db_grants = set()
    semantic_view_schema_grants = set()
    
    for semantic_view in parsed_tools["semantic_views"]:
        parts = semantic_view.split('.')
        if len(parts) == 3:
            db_name, schema_name, view_name = parts
            semantic_view_db_grants.add(db_name)
            semantic_view_schema_grants.add(f"{db_name}.{schema_name}")
    
    # Generate database and schema USAGE grants for semantic model files (stages)
    semantic_model_db_grants = set()
    semantic_model_schema_grants = set()
    stage_grants = set()
    
    for semantic_model_file in parsed_tools.get("semantic_model_files", []):
        database, schema, stage_name = extract_stage_info_from_semantic_model_file(semantic_model_file)
        if all([database, schema, stage_name]):
            semantic_model_db_grants.add(database)
            semantic_model_schema_grants.add(f"{database}.{schema}")
            stage_grants.add(f"{database}.{schema}.{stage_name}")
    
    # Generate database and schema USAGE grants for search services
    search_db_grants = set()
    search_schema_grants = set()
    
    for search_service in parsed_tools["search_services"]:
        parts = search_service.split('.')
        if len(parts) == 3:
            db_name, schema_name, service_name = parts
            search_db_grants.add(db_name)
            search_schema_grants.add(f"{db_name}.{schema_name}")
    
    # Generate database and schema USAGE grants for procedures
    procedure_db_grants = set()
    procedure_schema_grants = set()
    
    for procedure in parsed_tools.get("procedures", []):
        # Extract database and schema from procedure identifier
        # Format: DATABASE.SCHEMA.PROCEDURE_NAME(PARAM_TYPES)
        procedure_parts = procedure.split('(')[0]  # Remove parameter types
        parts = procedure_parts.split('.')
        if len(parts) >= 3:
            db_name, schema_name, proc_name = parts
            procedure_db_grants.add(db_name)
            procedure_schema_grants.add(f"{db_name}.{schema_name}")
    
    # Add agent's own database and schema USAGE grants
    agent_db_grants = {agent_database}
    agent_schema_grants = {f"{agent_database}.{agent_schema}"}
    
    # CRITICAL FIX: Add database and schema grants for tables discovered in semantic view YAML
    # that are not already covered by the agent tool specifications
    table_db_grants = set()
    table_schema_grants = set()
    
    for tables in table_permissions_results.values():
        for db, schema, table in tables:
            table_db_grants.add(db)
            table_schema_grants.add(f"{db}.{schema}")
    
    # Combine all database and schema grants
    all_db_grants = semantic_view_db_grants.union(search_db_grants).union(agent_db_grants).union(table_db_grants).union(procedure_db_grants).union(semantic_model_db_grants)
    all_schema_grants = semantic_view_schema_grants.union(search_schema_grants).union(agent_schema_grants).union(table_schema_grants).union(procedure_schema_grants).union(semantic_model_schema_grants)
    
    # Generate permission grants
    db_grants = "\n".join([f"GRANT USAGE ON DATABASE {db} TO ROLE IDENTIFIER($AGENT_ROLE_NAME);" 
                          for db in sorted(all_db_grants)])
    
    schema_grants = "\n".join([f"GRANT USAGE ON SCHEMA {schema} TO ROLE IDENTIFIER($AGENT_ROLE_NAME);" 
                              for schema in sorted(all_schema_grants)])
    
    view_grants = "\n".join([f"GRANT SELECT ON VIEW {view} TO ROLE IDENTIFIER($AGENT_ROLE_NAME);" 
                            for view in sorted(parsed_tools["semantic_views"])])
    
    table_grants = "\n".join([f"GRANT SELECT ON TABLE {table} TO ROLE IDENTIFIER($AGENT_ROLE_NAME);" 
                            for table in sorted(all_table_permissions)])
    
    search_grants = "\n".join([f"GRANT USAGE ON CORTEX SEARCH SERVICE {service} TO ROLE IDENTIFIER($AGENT_ROLE_NAME);" 
                              for service in sorted(parsed_tools["search_services"])])
    
    # Generate procedure grants
    procedure_grants = "\n".join([f"GRANT USAGE ON PROCEDURE {procedure} TO ROLE IDENTIFIER($AGENT_ROLE_NAME);" 
                                for procedure in sorted(parsed_tools.get("procedures", []))])
    
    # Generate stage grants for semantic model files
    stage_grants_sql = ""
    if stage_grants:
        stage_grants_sql = "\n".join([f"GRANT READ ON STAGE {stage} TO ROLE IDENTIFIER($AGENT_ROLE_NAME);" 
                                     for stage in sorted(stage_grants)])
        stage_grants_sql = f"\n{stage_grants_sql}"
    
    # Generate tool-specific warehouse grants
    tool_warehouse_grants = ""
    if parsed_tools.get("tool_warehouses"):
        tool_warehouse_grants = "\n".join([
            f"GRANT USAGE ON WAREHOUSE IDENTIFIER('{warehouse}') TO ROLE IDENTIFIER($AGENT_ROLE_NAME); -- Required for tool: {tool_name}"
            for tool_name, warehouse in parsed_tools["tool_warehouses"].items()
        ])
        if tool_warehouse_grants:
            tool_warehouse_grants = f"\n-- Tool-specific warehouse permissions\n{tool_warehouse_grants}"
    
    # Assemble the complete script
    script = f"""-- =========================================================================================
-- AUTO-GENERATED LEAST-PRIVILEGE SCRIPT FOR AGENT: {fully_qualified_agent}
-- Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
-- Generated by: Automated Agent Permissions Notebook
-- =========================================================================================

-- IMPORTANT: Review and adjust the placeholder variables below for your environment.
SET AGENT_ROLE_NAME = '{agent_name}_USER_ROLE';
SET WAREHOUSE_NAME = '{warehouse_name}';

-- Create a dedicated role for the agent's permissions.
USE ROLE SECURITYADMIN; -- Or your own privileged role to assign permissions
CREATE ROLE IF NOT EXISTS IDENTIFIER($AGENT_ROLE_NAME);
GRANT ROLE IDENTIFIER($AGENT_ROLE_NAME) TO ROLE SYSADMIN; -- Optional: Allows SYSADMIN to manage the role.

-- Grant core permission to use the agent object itself.
GRANT USAGE ON AGENT {fully_qualified_agent} TO ROLE IDENTIFIER($AGENT_ROLE_NAME);

-- Grant permissions on the underlying database objects required by the agent's tools.
-- NOTE: These permissions are derived from the agent's tool specification and semantic view YAML definitions.

-- Database and Schema USAGE grants (including agent location, tool-specific locations, tables from semantic views, procedures, and stages)
{db_grants}
{schema_grants}

-- Permissions for 'cortex_analyst_text_to_sql' tools
-- Semantic view permissions
{view_grants}

-- Base table permissions (from semantic view YAML)
{table_grants}

-- Permissions for 'cortex_search' tools
{search_grants}

-- Permissions for 'generic' tools (procedures)
{procedure_grants}

-- Stage permissions for semantic model files
{stage_grants_sql}

{tool_warehouse_grants}

-- Grant warehouse usage to the role for the user's session.
GRANT USAGE ON WAREHOUSE IDENTIFIER($WAREHOUSE_NAME) TO ROLE IDENTIFIER($AGENT_ROLE_NAME);

-- =========================================================================================
SELECT 'Setup complete for role ' || $AGENT_ROLE_NAME AS "Status";
-- =========================================================================================
"""
    
    return script


In [None]:
# Test the updated function with agent location permissions and tool-specific warehouse permissions
if example_agent_data:
    parsed_tools = parse_agent_tools(example_agent_data)
    
    # Use actual table permissions if available, otherwise use empty results
    if 'table_permissions_results' in locals():
        final_table_permissions = table_permissions_results
    else:
        final_table_permissions = {}
    
    # Generate script with the updated function that includes agent location and tool-specific warehouses
    updated_permission_script = generate_comprehensive_permission_script_with_agent_location(
        parsed_tools=parsed_tools,
        table_permissions_results=final_table_permissions,
        warehouse_name="COMPUTE_WH"
    )
    
    #print("Updated Permission Script (with Agent Location and Tool-Specific Warehouses):")
    print("--","=" * 80)
    print(updated_permission_script)
    
        
else:
    print("No agent data available to test updated function")
