# ❄️ Anthropic on Snowflake Cortex

Building an intelligent question-answering system using Anthropic's Claude and Snowflake's AI capabilities.

This notebook demonstrates how to build an end-to-end application that:
1. Processes PDF documents using Cortex Process Docouments
2. Creates Cortex Search Service to do keyword and vector searches
3. Implements a chat interface using Snowflake's Cortex and Anthropic's Claude in Streamlit

## Setting Up Your Environment 

First, we'll import the required packages and set up our Snowflake session. The notebook uses several key packages:
- `streamlit`: For creating the interactive chat interface
- `snowflake-ml-python`: For Snowflake Cortex for embeddings and LLM capabilities


In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import json

from snowflake.snowpark.context import get_active_session
from snowflake.cortex import complete, EmbedText768
from snowflake.snowpark.types import VectorType, FloatType
from snowflake.core.table import Table, TableColumn
from snowflake.core import CreateMode, Root
from snowflake.snowpark.functions import cast, col


session = get_active_session()
current_warehouse = session.get_current_warehouse()
database_name = session.get_current_database()
schema_name = session.get_current_schema()
role_name = session.get_current_role()
service_name = 'document_search_service'
root = Root(session)
database = root.databases[database_name]
schema = database.schemas[schema_name]


## Setting Up Stage Variables 

We'll define our stage name and retrieve the list of files to process. This stage should contain the PDF documents we want to analyze.

In [None]:

stage_name = "@ANTHROPIC_RAG.ANTHROPIC_RAG.DOCUMENTS"
files = session.sql(f"LIST {stage_name}").collect()


## Document Processing Functions 

We'll create functions to extract text from PDF files


In [None]:
def process(file_name: str):
    query = """
        SELECT TO_VARCHAR(
            SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
                ?,
                ?,
                {'mode': 'OCR'}):content
        ) AS OCR;
    """

    resp = session.sql(query, params=[stage_name, file_name]).collect()
    text = resp[0]['OCR']
    
    df = pd.DataFrame({
        'TEXT' : [text],
        'FILE_NAME': file_name
    })
    
    return df

## Processing Documents

Now we'll:
1. Process all documents in our stage
2. Store the results in our table

In [None]:
# Extract file names and process files
file_names = [file['name'].split('/')[1] for file in files]

# Download and process files into a DataFrame
final_dataframe = pd.concat([
    process(file_name)
    for file_name in file_names
], ignore_index=True)

snowpark_df = session.create_dataframe(final_dataframe).select(
    col("file_name"),
    col("text")
)

# Write the transformed data directly to the target table
snowpark_df.write.mode("overwrite").save_as_table("docs_text_table")

## Create Cortex Search Service 

### Key Components Explained

#### Required Parameters

- `ON`: Specifies the column containing the text to be indexed  
- `ATTRIBUTES`: Additional columns to include in search results (e.g., file\_name)  
- `WAREHOUSE`: Compute warehouse for processing the embeddings  
- `TARGET_LAG`: Maximum allowed lag for index updates  
- `EMBEDDING_MODEL`: Model used to generate text embeddings  
- Source query: The SELECT statement defining the data to index

#### Configuration Options

1. Target Lag Settings:  
     
   - Shorter lag times mean more frequent updates  
   - Common values: '1 hour', '1 day', '1 week'  
   - Balance freshness needs with compute costs

   

2. Embedding Model Options:  
     
   - 'snowflake-arctic-embed-l-v2.0': Latest Snowflake embedding model  
   - Optimized for English language content  
   - 384-dimensional embeddings

   

3. Warehouse Considerations:  
     
   - Choose size based on data volume  
   - Consider compute costs vs update frequency  
   - Monitor warehouse utilization

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE {{service_name}}
  ON text
  ATTRIBUTES file_name
  WAREHOUSE = {{current_warehouse}}
  TARGET_LAG = '1 day'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
  AS (
    SELECT
        text,
        file_name
    FROM docs_text_table
);

## Building the Chat Interface

Finally, we'll create our chat interface that uses:
- Utilizes the Cortex Search Service for finding relevant context
- Chat history management for conversation continuity
- Anthropic's Claude model for generating responses
- Streamlit for the user interface

Key parameters:
- `num_results`: Number of context results provided (default: 3)
- `model_name`: Language model used (default: "claude-3-5-sonnet")
- `history_length`: Chat history length (default: 5)

In [None]:
num_results = 8  # Number of results
model_name = "claude-3-5-sonnet"  # The model we are using
history_length = 5 # Number of chat messages in history

def init_messages():
    """
    Initialize the session state for chat messages. If the session state indicates that the
    conversation should be cleared or if the "messages" key is not in the session state,
    initialize it as an empty list.
    """
    if st.session_state.clear_conversation or "messages" not in st.session_state:
        st.session_state.messages = []
        st.session_state.suggestions = []
        st.session_state.active_suggestion = None
        

def init_config_options():
    """
    Initialize the chat interface configuration and display existing chat history.
    Provides a button to clear conversation history and maintains chat state.
    """

    st.session_state.num_chat_messages = history_length
    st.button("Clear conversation", key="clear_conversation")
    
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

def get_chat_history():
    """
    Retrieve the chat history from the session state limited to the number of messages

    Returns:
        list: The list of chat messages from the session state.
    """
    start_index = max(
        0, len(st.session_state.messages) - st.session_state.num_chat_messages
    )
    return st.session_state.messages[start_index : len(st.session_state.messages) - 1]

def make_chat_history_summary(chat_history, question):
    """
    Generate a summary of the chat history combined with the current question to extend the query
    context. Use the language model to generate this summary.

    Args:
        chat_history (str): The chat history to include in the summary.
        question (str): The current user question to extend with the chat history.

    Returns:
        str: The generated summary of the chat history and question.
    """
    
    prompt = f"""
        Given the following conversation history and new question, generate a detailed query that incorporates relevant context from the chat history. The query should be written in natural, conversational language and include any important details, preferences, or constraints mentioned previously.

        <chat_history>
        {chat_history}
        </chat_history>
        
        <question>
        {question}
        </question>
        
        Please generate a single, comprehensive query that combines the above information. The query should be self-contained and allow for a complete response without requiring additional context.
    """

    summary = complete(model_name, prompt)

    return summary

def cortex_search(my_question):
    search_service = (root
      .databases[database_name]
      .schemas[schema_name]
      .cortex_search_services[service_name]
    )

    resp = search_service.search(
      query=my_question,
      columns=["text", "file_name"],
      limit=num_results
    )

    results = json.loads(resp.to_json())["results"]
    prompt_context = ""

    # Building the context from the search results
    for result in results:
        prompt_context += result["text"]
    prompt_context = prompt_context.replace("'", "")

    file_name = results[0]['file_name']
        
    return prompt_context, file_name

def create_prompt(user_question):
    """
    Create a prompt for the language model by combining the user question with context retrieved
    from the cortex search service and chat history (if enabled). Format the prompt according to
    the expected input format of the model.

    Args:
        user_question (str): The user's question to generate a prompt for.

    Returns:
        str: The generated prompt for the language model.
    """

    chat_history = get_chat_history()
    if chat_history != []:
        question_summary = make_chat_history_summary(chat_history, user_question)
        prompt_context, file_name  = cortex_search(question_summary)
    else:
        prompt_context, file_name = cortex_search(user_question)
        question_summary = ''

    prompt = f"""You are a documentation specialist focused on providing precise answers based on provided documentation. 

        Input Context:
        Context: {prompt_context}
        Question: {question_summary}
        Chat History: {chat_history}
        
        Instructions:
        1. Analyze the provided context carefully
        2. Frame responses to build upon any relevant chat history
        3. Structure answers as follows:
           - Direct answer to the question
           - Required prerequisites or dependencies
           - Step-by-step implementation (if applicable)
           - Important limitations or warnings
        
        If information is not found in context:
        1. Explicitly state what information is missing
        2. Avoid assumptions or external references
        3. Specify what additional context would help answer the question
        
        Remember: Only reference information from the provided context.
        
        Response:"""
    return prompt, file_name

def display_response(my_question):
    with st.status("In progress...") as status:
        # Get the response from the AI model
        response, name = complete(model_name, my_question)
        
        # Display the response from the model
        st.markdown(response)
        status.update(label="Done!", state="complete", expanded=True)
        
        # Display the source document name
        with st.container():
            display_name = f"This information came from {name}"
            st.markdown(f"This information came from {name}")

# Main code
def main():
    st.title(f":speech_balloon: Chatbot with Snowflake Cortex with Anthropic Claude")

    init_config_options()
    init_messages()

    icons = {"assistant": "❄️", "user": "👤"}
    
    if question := st.chat_input("Ask a question..."):
        # Add user message to chat history
        st.session_state.messages.append({"role": "user", "content": question})
        # Display user message in chat message container
        with st.chat_message("user", avatar=icons["user"]):
            st.markdown(question.replace("$", "\$"))

        # Display assistant response in chat message container
        with st.chat_message("assistant", avatar=icons["assistant"]):
            message_placeholder = st.empty()
            # question = question.replace("'", "")
            with st.spinner("Thinking..."):
                # Generate the response
                prompt, file_name = create_prompt(question)
                generated_response = complete(model_name, prompt)
                
                # Store the generated response directly in session state
                st.session_state.gen_response = generated_response
                
                # Display the generated response
                message_placeholder.markdown(generated_response)

        st.session_state.messages.append(
            {"role": "assistant", "content": generated_response}
        )
        

if __name__ == "__main__":
    session = get_active_session()
    main()
