# Cortex Agents
In this notebook you will setup multiple Cortex Search and Cortex Analyst Services which will be used by Cortex Agents to answer user queries on unstructured and structured data.
![text](https://github.com/michaelgorkow/snowflake_cortex_agents_demo/blob/main/resources/cortex_agents_notebook_small.png?raw=true)

In [None]:
import warnings
warnings.filterwarnings("ignore")

import random
from datetime import datetime, timedelta

import pandas as pd
import streamlit as st
from faker import Faker

from snowflake.core import Root
from snowflake.cortex import Complete
from snowflake.snowpark import types as T
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col

# Set seed for reproducibility
random.seed(42)

session = get_active_session()

# Setup the Cortex Search Service [Unstructured Data]

We have some PDF documents in our stage **DOCUMENTS** that we want business users to be able to ask questions about.  
To achieve this, we need to extract the contents of the PDF files and make them searchable.

## Extracting Content from PDF Files

### [`PARSE_DOCUMENT`](https://docs.snowflake.com/en/sql-reference/functions/parse_document-snowflake-cortex)  
This function returns the extracted content from a document on a Snowflake stage as an **OBJECT** that contains JSON-encoded objects as strings.  

It supports two types of extractions:  
- **Optical Character Recognition (OCR)**  
- **Layout Extraction**  

### [`SPLIT_TEXT_RECURSIVE_CHARACTER`](https://docs.snowflake.com/en/sql-reference/functions/split_text_recursive_character-snowflake-cortex)  
The `SPLIT_TEXT_RECURSIVE_CHARACTER` function splits a string into shorter strings recursively. It is useful for preprocessing text to be used with text embedding or search indexing functions.

In [None]:
-- List documents in stage
SELECT * FROM DIRECTORY('@DOCUMENTS');

In [None]:
-- Layout extraction for PDF documents
CREATE TABLE IF NOT EXISTS RAW_TEXT AS
SELECT 
    RELATIVE_PATH,
    TO_VARCHAR (
        SNOWFLAKE.CORTEX.PARSE_DOCUMENT (
            '@DOCUMENTS',
            RELATIVE_PATH,
            {'mode': 'LAYOUT'} ):content
        ) AS EXTRACTED_LAYOUT,
    -- A custom Python UDF that was created during the demo setup
    remove_duplicate_headers(EXTRACTED_LAYOUT) AS CLEANED_LAYOUT
FROM 
    DIRECTORY('@DOCUMENTS');

SELECT * FROM RAW_TEXT;

In [None]:
-- Create chunks from extracted content
CREATE TABLE IF NOT EXISTS CHUNKED_TEXT AS
SELECT
   RELATIVE_PATH,
   c.INDEX::INTEGER AS CHUNK_INDEX,
   c.value::TEXT AS CHUNK_TEXT
FROM
   RAW_TEXT,
   LATERAL FLATTEN( input => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER (
      CLEANED_LAYOUT,
      'markdown',
      1800,
      0,
      ['\n\n', '\n', ' ', '']
   )) c;

SELECT * FROM CHUNKED_TEXT;

In [None]:
-- Create a Cortex Search Service for Annual Reports
CREATE CORTEX SEARCH SERVICE IF NOT EXISTS SNOWPRINT_PRODUCT_GUIDES
  ON CHUNK_TEXT
  ATTRIBUTES RELATIVE_PATH, CHUNK_INDEX
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      CHUNK_TEXT,
      RELATIVE_PATH,
      CHUNK_INDEX
  FROM CHUNKED_TEXT
);

### (Optional) Test Your Service in a Simple RAG Pipeline  

In this small example, we **combine Cortex Search with Cortex LLMs** to generate a response from context—also known as **Retrieval-Augmented Generation (RAG)**.  
This approach enhances responses by retrieving relevant data before generating an answer, improving accuracy and contextual relevance. 🚀  

In [None]:
root = Root(get_active_session())

question = 'How do I set up and configure a new user in the Prinect system?'

# Fetch service
my_service = (root
  .databases["CORTEX_AGENTS_DEMO"]
  .schemas["SNOWPRINT"]
  .cortex_search_services["SNOWPRINT_PRODUCT_GUIDES"]
)

# Query service
resp = my_service.search(
  query=question,
  columns=["CHUNK_INDEX", "CHUNK_TEXT", "RELATIVE_PATH"],
  limit=1
)
resp = resp.results[0]

st.info(f'# File: {resp["RELATIVE_PATH"]}\n\n {resp["CHUNK_TEXT"]}')

# Generate Response
model = 'mistral-large2'
prompt = f"{question} Answer based on the provided context: {resp['CHUNK_TEXT']}"
response = Complete(model, prompt).strip()

st.info(f'# LLM Response: \n\n {response}')

# Generate Data for Cortex Analst [Structured Data]  

## Dataset
We are generating a **fictional dataset** from _Snowprint AG_. The dataset is split into three core tables that capture different aspects of the company’s printing operations:

1. **Customers**  
   Stores client information such as name, contact details, location, and phone numbers.  
   Example use: Identifying customer segments by country or city, generating targeted marketing campaigns, or pulling up client contact details on demand.

2. **Jobs**  
   Tracks printing jobs with columns for job name, priority, due date, and completion status.  
   Example use: Monitoring overall job status, identifying high-priority tasks, or finding overdue projects.

3. **Steps**  
   Details the step-by-step process for each job (e.g., _Prepare_, _Printing_, _Quality Checks_) along with timestamps.  
   Example use: Analyzing production workflows, calculating turnaround times for each step, or optimizing bottlenecks.

## Business Use Case Examples
Users will be able to get insights into following topics:
- **Customer Relationship Management**: Quickly view customers by region or generate lists of pending orders.
- **Production Tracking**: Identify high-priority jobs, check which steps are completed, or spot delays in real time.
- **Performance Analytics**: Calculate average lead times, track overall productivity, and improve scheduling and resource allocation.


In [None]:
# List of European countries and their corresponding phone codes
countries = ["Germany", "France", "Italy", "Spain", "United Kingdom", "Netherlands", "Belgium", "Switzerland", "Austria", "Sweden"]
country_codes = {
    "Germany": "+49",
    "France": "+33",
    "Italy": "+39",
    "Spain": "+34",
    "United Kingdom": "+44",
    "Netherlands": "+31",
    "Belgium": "+32",
    "Switzerland": "+41",
    "Austria": "+43",
    "Sweden": "+46"
}

# Mapping of countries to locale-specific Faker instances for realistic addresses
faker_locales = {
    "Germany": Faker("de_DE"),
    "France": Faker("fr_FR"),
    "Italy": Faker("it_IT"),
    "Spain": Faker("es_ES"),
    "United Kingdom": Faker("en_GB"),
    "Netherlands": Faker("nl_NL"),
    "Belgium": Faker("fr_BE"),
    "Switzerland": Faker("de_CH"),
    "Austria": Faker("de_AT"),
    "Sweden": Faker("sv_SE")
}

# Create a default Faker instance for company and person names
fake = Faker()

def generate_company_name():
    return fake.company()

def generate_contact_person():
    return fake.name()

def generate_email(contact_person, company_name):
    # Use the contact person's first and last names and a domain based on the company name.
    first, last = contact_person.split(" ", 1)
    # Remove spaces and punctuation from the company name and convert to lowercase.
    domain = "".join([c for c in company_name if c.isalnum()]).lower() + ".com"
    return f"{first.lower()}.{last.lower()}@{domain}"

def generate_phone_number(country):
    code = country_codes[country]
    # Generate a realistic phone number: country code, then three groups of digits.
    area = random.randint(100, 999)
    part1 = random.randint(100, 999)
    part2 = random.randint(1000, 9999)
    return f"{code} {area} {part1} {part2}"

# Generate sample data for 100 Customers
customer_data = []
customer_ids = [f'C_{i:06d}' for i in range(1,100)]
for cid in customer_ids:
    customer_id = cid
    company_name = generate_company_name()
    contact_person = generate_contact_person()
    country = random.choice(countries)
    email = generate_email(contact_person, company_name)
    phone = generate_phone_number(country)
    
    # Use the locale-specific Faker instance to generate city and street address
    locale_fake = faker_locales[country]
    city = locale_fake.city()
    street_address = locale_fake.street_address().replace('\n','')
    
    customer_data.append({
        "CUSTOMER_ID": customer_id,
        "CUSTOMER_NAME": company_name,
        "CONTACT_PERSON": contact_person,
        "EMAIL": email,
        "COUNTRY": country,
        "CITY": city,
        "STREET_ADDRESS": street_address,
        "PHONE_NUMBER": phone
    })

customers = pd.DataFrame(customer_data)

jobs_list = []
job_processing_steps_list = []
job_id_counter = 1

# Fixed processing sequence
processing_sequence = ["Qualify", "Prepare", "Imposition", "Proof", "Print"]

# For each customer, generate between 100 and 200 jobs.
for cust_id in customer_ids:
    num_jobs = random.randint(100, 200)
    for i in range(num_jobs):
        # Generate job creation date within the last 30 days
        created_at_dt = datetime.now() - timedelta(days=random.randint(1, 30),
                                                     hours=random.randint(0, 23),
                                                     minutes=random.randint(0, 59))
        # Due date: after the job is created (between 1 and 30 days later)
        due_date_dt = created_at_dt + timedelta(days=random.randint(1, 30))
        
        # Decide final job status with weights:
        # Let’s assume: Pending (20%), In Progress (20%), Completed (50%), Canceled (10%)
        final_status = random.choices(["Pending", "In Progress", "Completed", "Canceled"],
                                      weights=[20, 20, 50, 10])[0]
        
        job_id = f'JOB_{job_id_counter:06d}'
        job_id_counter += 1
        
        jobs_list.append({
            "JOB_ID": job_id,
            "JOB_NAME": f"Print Job {job_id_counter}",
            "CUSTOMER_ID": cust_id,
            "STATUS": final_status,
            "DUE_DATE": due_date_dt.strftime('%Y-%m-%d'),
            "CREATED_AT": created_at_dt.strftime('%Y-%m-%d %H:%M:%S'),
            "JOB_PRIORITY": random.choice(["High", "Medium", "Low"]),
        })
        
        # Create processing steps only if the job is no longer Pending.
        if final_status != "Pending":
            # For a "Completed" job, we always use all 5 steps.
            # For jobs "In Progress" or "Canceled", randomly choose 1 to 4 steps.
            if final_status == "Completed":
                steps_count = 5
            else:
                steps_count = random.randint(1, 4)
                
            # Base time for processing steps starts after the job's creation.
            step_time = created_at_dt + timedelta(hours=random.randint(1, 12))
            
            # Create steps in the fixed order
            for step_index in range(steps_count):
                step_name = processing_sequence[step_index]
                # Reset STEP_ID for each job so that it starts with STEP_1
                step_id = f'STEP_{step_index+1}'
                
                is_last = (step_index == steps_count - 1)
                if is_last:
                    if final_status == "Completed":
                        step_status = "Completed"
                    elif final_status == "In Progress":
                        step_status = "In Progress"
                    elif final_status == "Canceled":
                        step_status = "Canceled"
                else:
                    step_status = "Completed"
                
                # The step's start time is the current step_time.
                started_at = step_time
                
                # If the step is completed, add a duration (30 to 120 minutes)
                if step_status == "Completed":
                    duration = timedelta(minutes=random.randint(30, 120))
                    completed_at = started_at + duration
                    # Next step starts a little after the previous step’s completion
                    step_time = completed_at + timedelta(minutes=random.randint(10, 30))
                else:
                    completed_at = None  # in-progress or canceled step is not finished
                
                job_processing_steps_list.append({
                    "STEP_ID": step_id,
                    "JOB_ID": job_id,
                    "SEQUENCE_NAME": step_name,
                    "STARTED_AT": started_at.strftime('%Y-%m-%d %H:%M:%S'),
                    "COMPLETED_AT": completed_at.strftime('%Y-%m-%d %H:%M:%S') if completed_at else None,
                    "STATUS": step_status,
                })

# Create DataFrames
jobs = pd.DataFrame(jobs_list)
job_processing_steps = pd.DataFrame(job_processing_steps_list)

customers = session.create_dataframe(customers, schema=customers.columns)
customers.write.save_as_table(table_name='CUSTOMERS', mode='overwrite')
session.table('CUSTOMERS').show()

jobs = session.create_dataframe(jobs, schema=jobs.columns)
jobs = jobs.with_column("DUE_DATE", col("DUE_DATE").cast(T.DateType()))
jobs = jobs.with_column("CREATED_AT", col("CREATED_AT").cast(T.DateType()))
jobs.write.save_as_table(table_name='JOBS', mode='overwrite')
session.table('JOBS').show()

job_processing_steps = session.create_dataframe(job_processing_steps, schema=job_processing_steps.columns)
job_processing_steps = job_processing_steps.with_column("STARTED_AT", col("STARTED_AT").cast(T.TimestampType()))
job_processing_steps = job_processing_steps.with_column("COMPLETED_AT", col("COMPLETED_AT").cast(T.TimestampType()))
job_processing_steps.write.save_as_table(table_name='JOB_PROCESS_STEPS', mode='overwrite')
session.table('JOB_PROCESS_STEPS').show()

# Dynamic Literal Retrieval with Cortex Analyst

Business users may not have detailed knowledge of how data is stored in Snowflake.  
Instead of ingesting all possible values of a column into **Cortex Analyst**, we will use **dynamic literal retrieval** via the [Cortex Search Integration](https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-analyst/cortex-analyst-search-integration).

## How It Works  
When a user asks a question about that requires any of the columns we define in the Cortex Search Services, **Cortex Analyst** will:  
1. Retrieve the relevant literal dynamically from the respective **Cortex Search Service**  
2. Use the retrieved literal for **SQL generation**  

This approach ensures efficient and accurate query generation for columns with high cardinality without preloading all possible values into Cortex Analyst.  
Another major benefit is that the Cortex Search Service is updated automatically so that new values will be available to Cortex Analyst immediately without any operational work.


In [None]:
CREATE CORTEX SEARCH SERVICE IF NOT EXISTS _ANALYST_SEARCH_CUSTOMER_NAME
  ON CUSTOMER_NAME
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      DISTINCT CUSTOMER_NAME
  FROM CUSTOMERS
);

In [None]:
CREATE CORTEX SEARCH SERVICE IF NOT EXISTS _ANALYST_SEARCH_CONTACT_PERSON
  ON CONTACT_PERSON
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      DISTINCT CONTACT_PERSON,
  FROM CUSTOMERS
);

In [None]:
CREATE CORTEX SEARCH SERVICE IF NOT EXISTS _ANALYST_SEARCH_COUNTRY
  ON COUNTRY
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      DISTINCT COUNTRY,
  FROM CUSTOMERS
);

In [None]:
CREATE CORTEX SEARCH SERVICE IF NOT EXISTS _ANALYST_SEARCH_CITY
  ON CITY
  WAREHOUSE = COMPUTE_WH
  TARGET_LAG = '1 hour'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
  SELECT
      DISTINCT CITY,
  FROM CUSTOMERS
);