In [None]:
import warnings
warnings.filterwarnings("ignore")
import streamlit as st
import pandas as pd
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col
from snowflake.snowpark import types as T
from snowflake.core import Root
from snowflake.cortex import Complete
from service_generation import create_cortex_search_service
session = get_active_session()
root = Root(get_active_session())

# Document Generation
import doc_generation
doc_generation.generate_demo_documents(session)

# Cortex Agents
In this notebook you will setup multiple Cortex Search and Cortex Analyst Services which will be used by Cortex Agents to answer user queries on unstructured and structured data.
![text](https://github.com/michaelgorkow/snowflake_cortex_agents_demo/blob/main/resources/cortex_agents_notebook_small.png?raw=true)

# Setup the Cortex Search Service [Unstructured Data]

We have some PDF documents in our stage **DOCUMENTS** that we want business users to be able to ask questions about.  
To achieve this, we need to extract the contents of the PDF files and make them searchable.

## Extracting Content from PDF Files

### [`PARSE_DOCUMENT`](https://docs.snowflake.com/en/sql-reference/functions/parse_document-snowflake-cortex)  
This function returns the extracted content from a document on a Snowflake stage as an **OBJECT** that contains JSON-encoded objects as strings.  

It supports two types of extractions:  
- **Optical Character Recognition (OCR)**  
- **Layout Extraction**  

### [`SPLIT_TEXT_RECURSIVE_CHARACTER`](https://docs.snowflake.com/en/sql-reference/functions/split_text_recursive_character-snowflake-cortex)  
The `SPLIT_TEXT_RECURSIVE_CHARACTER` function splits a string into shorter strings recursively. It is useful for preprocessing text to be used with text embedding or search indexing functions.

In [None]:
-- List documents in stage
SELECT * FROM DIRECTORY('@DOCUMENTS');

In [None]:
-- Layout extraction for PDF documents
CREATE TABLE IF NOT EXISTS _UNSTR_RAW_DOCUMENTS_MARKETING_CAMPAIGNS AS
SELECT 
    RELATIVE_PATH,
    TO_VARCHAR (
        SNOWFLAKE.CORTEX.PARSE_DOCUMENT (
            '@DOCUMENTS',
            RELATIVE_PATH,
            {'mode': 'LAYOUT'} ):content
        ) AS EXTRACTED_LAYOUT 
FROM 
    DIRECTORY('@DOCUMENTS')
WHERE
    startswith(RELATIVE_PATH, 'marketing_campaigns/');

SELECT * FROM _UNSTR_RAW_DOCUMENTS_MARKETING_CAMPAIGNS;

In [None]:
-- Create chunks from extracted content
CREATE OR REPLACE TABLE _UNSTR_CHUNKED_DOCUMENTS_MARKETING_CAMPAIGNS AS
SELECT
   RELATIVE_PATH,
   GET_PRESIGNED_URL(@DOCUMENTS, RELATIVE_PATH, 604800) AS URL,
   c.INDEX::INTEGER AS CHUNK_INDEX,
   c.value::TEXT AS CHUNK_TEXT
FROM
   _UNSTR_RAW_DOCUMENTS_MARKETING_CAMPAIGNS,
   LATERAL FLATTEN( input => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER (
      EXTRACTED_LAYOUT,
      'markdown',
      4000,
      0,
      ['\n\n', '\n', ' ', '']
   )) c;

SELECT * FROM _UNSTR_CHUNKED_DOCUMENTS_MARKETING_CAMPAIGNS;

In [None]:
-- Create a Cortex Search Service for Annual Reports
CREATE OR REPLACE CORTEX SEARCH SERVICE SEARCH_MARKETING_CAMPAIGNS
  ON CHUNK_TEXT
  ATTRIBUTES RELATIVE_PATH, CHUNK_INDEX
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      CHUNK_TEXT,
      RELATIVE_PATH,
      CHUNK_INDEX,
      URL
  FROM _UNSTR_CHUNKED_DOCUMENTS_MARKETING_CAMPAIGNS
);

In [None]:
# Create additional search services
create_cortex_search_service(session, 'product_specifications')
create_cortex_search_service(session, 'regional_market_reports')
create_cortex_search_service(session, 'financial_operations_reports')
create_cortex_search_service(session, 'customer_contracts')

### [Optional] Test Your Service in a Simple RAG Pipeline  

In this small example, we **combine Cortex Search with Cortex LLMs** to generate a response from context—also known as **Retrieval-Augmented Generation (RAG)**.  
This approach enhances responses by retrieving relevant data before generating an answer, improving accuracy and contextual relevance. 🚀  

In [None]:
question = 'Which marketing campaigns targeted the Chocolate category and what were the sales results?'

# Fetch service
my_service = (root
  .databases["CORTEX_AGENTS_DEMO"]
  .schemas["FINANCE_FOOD_BEVERAGE"]
  .cortex_search_services["SEARCH_MARKETING_CAMPAIGNS"]
)

# Query service
resp = my_service.search(
  query=question,
  columns=["CHUNK_INDEX", "CHUNK_TEXT", "RELATIVE_PATH", "URL"],
  limit=1,
  experimental={'returnConfidenceScores':True}
)
resp = resp.results[0]

st.info(f'**File:** {resp["RELATIVE_PATH"]}\n\n **Source:**\n\n {resp["URL"]}\n\n {resp["CHUNK_TEXT"]}')

# Generate Response
model = 'mistral-large2'
prompt = f"{question} Answer based on the provided context: {resp['CHUNK_TEXT']}"
response = Complete(model, prompt).strip()

st.info(f'**LLM Response:**\n\n**{response}**')

# Setup the Cortex Analyst Service [Structured Data]  

We generate a realistic looking financial dataset for a food and beverage company that users will be able to **query in natural language**.  

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random

# Set random seed for reproducibility
np.random.seed(42)
random.seed(42)

def generate_financial_dataset():
    """Generate a realistic financial dataset for food and beverage company"""
    
    categories = {
        'Coffee': ['NesKafe Classic', 'NesKafe Gold', 'NesKafe Decaf', 'NesKafe Instant',
                  'NesKafe Cappuccino', 'NesKafe Latte', 'NesKafe Mocha'],
        'Water': ['PureLife Natural', 'PureLife Sparkling', 'PureLife Flavored', 
                 'AquaFlow Premium', 'SpringSource Mountain', 'CrystalClear Pure'],
        'Chocolate': ['ChocoBars Dark', 'ChocoBars Milk', 'ChocoBars White', 'ChocoBars Almond',
                     'SweetTreats Original', 'SweetTreats Caramel', 'ChocoWafers Crispy',
                     'CreamyBites Hazelnut', 'DeluxeChoc Premium'],
        'Baby Food': ['BabyFirst Formula', 'BabyFirst Organic', 'BabyFirst Cereal', 
                     'TinyTots Puree', 'LittleOnes Snacks', 'InfantCare Plus'],
        'Dairy': ['CreamyDelight Vanilla', 'CreamyDelight Chocolate', 'CreamyDelight Strawberry',
                 'FrozenJoy Cookies', 'FrozenJoy Mint', 'PremiumScoop Deluxe'],
        'Cereals': ['MorningCrunch Honey', 'MorningCrunch Chocolate', 'HealthyStart Oats',
                   'FiberPlus Original', 'KidsChoice Fruity'],
        'Pet Care': ['PetLove Dry Food', 'PetLove Wet Food', 'PetLove Treats', 'PetCare Premium']
    }
    
    products_data = []
    for category, products in categories.items():
        for i, product in enumerate(products, 1):
            products_data.append({
                'PRODUCT_ID': len(products_data) + 1,
                'PRODUCT_NAME': product,
                'CATEGORY': category,
                'UNIT_COST': round(random.uniform(0.5, 15.0), 2),
                'UNIT_PRICE': round(random.uniform(1.0, 25.0), 2)
            })
    
    products_df = pd.DataFrame(products_data)
    
    # 2. CUSTOMERS TABLE (High cardinality - customer names)
    customer_types = ['Supermarket', 'Convenience Store', 'Hypermarket', 'Online Retailer', 'Distributor']
    regions = ['North America', 'Europe', 'Asia Pacific', 'Latin America', 'Middle East & Africa']
    
    customer_names = [
        # Supermarkets
        'FreshMart Downtown', 'FreshMart Central', 'FreshMart Plaza', 'GroceryWorld Main',
        'GroceryWorld Express', 'SuperShop Premium', 'SuperShop Local', 'MegaStore Alpha',
        'MegaStore Beta', 'MegaStore Gamma', 'QuickBuy Central', 'QuickBuy Corner',
        
        # Hypermarkets
        'HyperMall North', 'HyperMall South', 'HyperMall East', 'GiantStore Complex',
        'GiantStore Plaza', 'UltraMart Mega', 'UltraMart Super',
        
        # Online Retailers
        'E-Commerce Hub', 'Digital Grocery Co', 'Online Fresh Ltd', 'WebMart Express',
        'VirtualStore Pro', 'ClickAndBuy Solutions',
        
        # Distributors
        'Regional Dist. Corp', 'National Supply Chain', 'Metro Distribution', 
        'Premium Wholesale Ltd', 'Global Trade Partners', 'Continental Suppliers',
        
        # International
        'EuroMart Berlin', 'EuroMart Paris', 'AsiaFresh Tokyo', 'AsiaFresh Seoul',
        'LatinMarket Mexico', 'LatinMarket Brazil', 'AfricaTrade Lagos', 'AfricaTrade Cairo'
    ]
    
    customers_data = []
    for i, name in enumerate(customer_names, 1):
        customers_data.append({
            'CUSTOMER_ID': i,
            'CUSTOMER_NAME': name,
            'CUSTOMER_TYPE': random.choice(customer_types),
            'REGION': random.choice(regions),
            'CREDIT_LIMIT': random.choice([50000, 100000, 250000, 500000, 1000000])
        })
    
    customers_df = pd.DataFrame(customers_data)
    
    # 3. TIME PERIODS TABLE
    start_date = datetime(2021, 1, 1)
    time_periods = []
    
    for i in range(48):  # 24 months
        current_date = start_date + timedelta(days=30*i)
        time_periods.append({
            'PERIOD_ID': i + 1,
            'YEAR': current_date.year,
            'MONTH': current_date.month,
            'QUARTER': f"Q{(current_date.month-1)//3 + 1}",
            'MONTH_NAME': current_date.strftime('%B'),
            'DATE': current_date.strftime('%Y-%m-%d')
        })
    
    time_periods_df = pd.DataFrame(time_periods)
    
    # 4. SALES TRANSACTIONS TABLE
    sales_data = []
    transaction_id = 1
    
    for period in range(1, 49):  # 24 months
        # Generate different number of transactions per month
        num_transactions = random.randint(800, 1200)
        
        for _ in range(num_transactions):
            customer_id = random.randint(1, len(customers_df))
            product_id = random.randint(1, len(products_df))
            quantity = random.randint(10, 1000)
            
            # Get product info for calculations
            product_info = products_df[products_df['PRODUCT_ID'] == product_id].iloc[0]
            unit_price = product_info['UNIT_PRICE']
            unit_cost = product_info['UNIT_COST']
            
            # Add some price variation
            actual_price = unit_price * random.uniform(0.9, 1.1)
            revenue = quantity * actual_price
            cost = quantity * unit_cost
            
            sales_data.append({
                'TRANSACTION_ID': transaction_id,
                'CUSTOMER_ID': customer_id,
                'PRODUCT_ID': product_id,
                'PERIOD_ID': period,
                'QUANTITY_SOLD': quantity,
                'UNIT_PRICE': round(actual_price, 2),
                'TOTAL_REVENUE': round(revenue, 2),
                'TOTAL_COST': round(cost, 2),
                'GROSS_PROFIT': round(revenue - cost, 2)
            })
            transaction_id += 1
    
    sales_df = pd.DataFrame(sales_data)
    
    # 5. MARKETING CAMPAIGNS TABLE
    campaigns_data = [
        {'campaign_id': 1, 'campaign_name': 'Coffee Lovers Special', 'category': 'Coffee', 
         'start_period': 3, 'end_period': 5, 'budget': 500000, 'discount_percent': 15},
        {'campaign_id': 2, 'campaign_name': 'Summer Hydration', 'category': 'Water', 
         'start_period': 6, 'end_period': 8, 'budget': 750000, 'discount_percent': 10},
        {'campaign_id': 3, 'campaign_name': 'Back to School', 'category': 'Cereals', 
         'start_period': 8, 'end_period': 9, 'budget': 400000, 'discount_percent': 20},
        {'campaign_id': 4, 'campaign_name': 'Holiday Treats', 'category': 'Chocolate', 
         'start_period': 11, 'end_period': 12, 'budget': 1000000, 'discount_percent': 25},
        {'campaign_id': 5, 'campaign_name': 'New Year Health', 'category': 'Baby Food', 
         'start_period': 13, 'end_period': 14, 'budget': 300000, 'discount_percent': 12},
        {'campaign_id': 6, 'campaign_name': 'Spring Refresh', 'category': 'Dairy', 
         'start_period': 15, 'end_period': 17, 'budget': 600000, 'discount_percent': 18},
        {'campaign_id': 7, 'campaign_name': 'Pet Love Month', 'category': 'Pet Care', 
         'start_period': 18, 'end_period': 18, 'budget': 200000, 'discount_percent': 30}
    ]
    
    campaigns_df = pd.DataFrame(campaigns_data)
    campaigns_df.columns = [col.upper() for col in campaigns_df.columns]
    
    return {
        'products': products_df,
        'customers': customers_df,
        'time_periods': time_periods_df,
        'sales': sales_df,
        'campaigns': campaigns_df
    }

# Generate the dataset
dataset = generate_financial_dataset()

# Save to Snowflake
products_df = session.write_pandas(df=dataset['products'], table_name='PRODUCTS', overwrite=True, auto_create_table=True)
customers_df = session.write_pandas(df=dataset['customers'], table_name='CUSTOMERS', overwrite=True, auto_create_table=True)
time_periods_df = session.write_pandas(df=dataset['time_periods'], table_name='TIME_PERIODS', overwrite=True, auto_create_table=True)
time_periods_df = time_periods_df.with_column("DATE", col("DATE").cast(T.DateType()))
time_periods_df.write.mode("overwrite").save_as_table('TIME_PERIODS', mode='overwrite')
campaigns_df = session.write_pandas(df=dataset['campaigns'], table_name='CAMPAIGNS', overwrite=True, auto_create_table=True)
sales_df = session.write_pandas(df=dataset['sales'], table_name='SALES', overwrite=True, auto_create_table=True)

# Display sample data
for table_name, df in dataset.items():
    st.subheader(f"\n{table_name.upper()}")
    st.dataframe(df.head(3))

# Dynamic Literal Retrieval with Cortex Analyst

Business users may not have detailed knowledge of how data is stored in Snowflake.  
Instead of ingesting all possible values of a column into **Cortex Analyst**, we will use **dynamic literal retrieval** via the [Cortex Search Integration](https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-analyst/cortex-analyst-search-integration).

## How It Works  
When a user asks a question about their **sales** that requires the `PRODUCT_NAME`, `CUSTOMER_NAME`, `CAMPAIGN_NAME` column, **Cortex Analyst** will:  
1. Retrieve the relevant literal dynamically from **Cortex Search**  
2. Use it for **SQL generation**  

This approach ensures efficient and accurate query generation without preloading all possible values into Cortex Analyst.  


In [None]:
CREATE CORTEX SEARCH SERVICE IF NOT EXISTS _ANALYST_PRODUCT_NAME_SEARCH
  ON PRODUCT_NAME
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      DISTINCT PRODUCT_NAME
  FROM PRODUCTS
);

In [None]:
CREATE CORTEX SEARCH SERVICE IF NOT EXISTS _ANALYST_CUSTOMER_NAME_SEARCH
  ON CUSTOMER_NAME
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      DISTINCT CUSTOMER_NAME,
  FROM CUSTOMERS
);

In [None]:
CREATE CORTEX SEARCH SERVICE IF NOT EXISTS _ANALYST_CAMPAIGN_SEARCH
  ON CAMPAIGN_NAME
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      DISTINCT CAMPAIGN_NAME,
  FROM CAMPAIGNS
);

### [Optional] Test Literal Retrievals

In [None]:
question = 'What was the over impact of the sumer hydration campaign?'

# Fetch service
my_service = (root
  .databases["CORTEX_AGENTS_DEMO"]
  .schemas["FINANCE_FOOD_BEVERAGE"]
  .cortex_search_services["_ANALYST_CAMPAIGN_SEARCH"]
)

# Query service
resp = my_service.search(
  query=question,
  columns=["CAMPAIGN_NAME"],
  limit=1
)
resp = resp.results[0]

st.info(f'**Search Results: {resp["CAMPAIGN_NAME"]}**')

In [None]:
question = 'What was the revenue per week for my customer supershop local?'

# Fetch service
my_service = (root
  .databases["CORTEX_AGENTS_DEMO"]
  .schemas["FINANCE_FOOD_BEVERAGE"]
  .cortex_search_services["_ANALYST_CUSTOMER_NAME_SEARCH"]
)

# Query service
resp = my_service.search(
  query=question,
  columns=["CUSTOMER_NAME"],
  limit=1
)

for r in resp.results:
    st.info(r['CUSTOMER_NAME'])

In [None]:
question = 'What was the revenue for pure life products?'

# Fetch service
my_service = (root
  .databases["CORTEX_AGENTS_DEMO"]
  .schemas["FINANCE_FOOD_BEVERAGE"]
  .cortex_search_services["_ANALYST_PRODUCT_NAME_SEARCH"]
)

# Query service
resp = my_service.search(
  query=question,
  columns=["PRODUCT_NAME"],
  limit=3
)

for r in resp.results:
    st.info(r['PRODUCT_NAME'])

# Explore the Semantic Model in the Snowflake UI  

With our data available in **Snowflake** and the **Search Services** set up, it's time to explore the **native Semantic Model Generator** in **Snowsight**.  
![text](https://github.com/michaelgorkow/snowflake_cortex_agents_demo/blob/main/resources/semantic_model_ui.png?raw=true)

## Why Do You Need a Semantic Model?  

**Cortex Analyst** allows users to query **Snowflake** data using **natural language**. However, business users often use terminology that does not align with the database schema.  

### The Problem  
- Users specify **domain-specific business terms** in their questions  
- Underlying data is stored using **technical abbreviations**  
- Example:  
  - **"CUST"** is used for **customers**  
  - **Schema lacks semantic context**, making queries harder to interpret  

### The Solution: Semantic Models  
Semantic models **map business terminology to database schemas** and provide **contextual meaning**.  

#### Example  
When a user asks:  
🗣️ *"Total revenue last month"*  

The **semantic model** can:  
✅ Define **"revenue"** as **net revenue**  
✅ Define **"last month"** as **the previous calendar month**  

This mapping helps **Cortex Analyst** understand **user intent** and generate **accurate answers**.  

🔗 More details can be found in the [Semantic Model Specification](https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-analyst/semantic-model-spec).  
