# ❄️ Snowflake Chat with your Documents Notebook ❄️

Includes:
- Cortex Recursive Split Text Character Parse Document
- Foundation LLMs
- Cortex Search Service
- Building the Pipeline
- Creating the App

In [None]:
# Import necessary functions
import streamlit as st
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
/**************************
 Set the context - update the database
**************************/
use database mdt2_cortex_search_docs#;
use schema data;

In [None]:
/*******************************************
Create a stage to hold the RAG architecture image

Change the database in the INTO and FROM before running
*******************************************/
CREATE or replace STAGE images                                              
  DIRECTORY = (enable = true)
  ENCRYPTION = (type = 'snowflake_sse');
COPY FILES
  INTO @images                                                              
  FROM @mdt2_cortex_search_docs#.public.mdt2_snowflake_extensions/branches/main/images/RAG_flow.png;      --CHANGE THIS

In [None]:
# Define image in a stage and read the file
image=session.file.get_stream("@IMAGES/RAG_flow.png" , decompress=False).read() 

# Display the image
st.image(image, width=1000)

In [None]:
/******************************
Create the stage to host your documents.  We can see that the stage is empty.
******************************/
CREATE or replace STAGE docs              --CHANGE THIS
  DIRECTORY = (enable = true)
  ENCRYPTION = (type = 'snowflake_sse');
ls @docs;                                 --CHANGE THIS

In [None]:
/******************************
We are preparing to operationalize the pipeline.  In order to do so, we need to create a STREAM that 'listens' for new documents to land in the stage.  We can see that the stream is empty.
******************************/
create or replace stream cortex_search_stream on stage docs;
select * from cortex_search_stream;

In [None]:
/******************************
All of the files that we need are in the Git repository

Change the name of the database
******************************/
ls @mdt2_cortex_search_docs#.public.mdt2_snowflake_extensions/branches/main/documents;                --CHANGE THIS

In [None]:
/******************************
Copy 4 of the files from the GIT repository to the Stage

Change the name of the database
******************************/
COPY FILES
  INTO @docs
  FROM @mdt2_cortex_search_docs#.public.mdt2_snowflake_extensions/branches/main/documents/Carver_Skis_Specification_Guide.pdf;          --CHANGE THIS
COPY FILES
  INTO @docs
  FROM @mdt2_cortex_search_docs#.public.mdt2_snowflake_extensions/branches/main/documents/Mondracer_Infant_Bike.pdf;                    --CHANGE THIS
COPY FILES
  INTO @docs
  FROM @mdt2_cortex_search_docs#.public.mdt2_snowflake_extensions/branches/main/documents/OutPiste_Skis_Specification_Guide.pdf;        --CHANGE THIS
COPY FILES
  INTO @docs
  FROM @mdt2_cortex_search_docs#.public.mdt2_snowflake_extensions/branches/main/documents/Premium_Bicycle_User_Guide.pdf;               --CHANGE THIS
alter stage docs refresh;
ls @docs;

In [None]:
--Let's verify that the Stream sees the new documents that have landed on the stage
select * from cortex_search_stream;

In [None]:
/*********************************************
 Create table to hold the extracted document data
*********************************************/
CREATE TABLE if not exists CHUNKS_PARSE_DOC (
     relative_path VARCHAR
   , size NUMBER
   , file_url VARCHAR
   , scoped_file_url VARCHAR
   , page_content VARIANT
   , category VARCHAR
);

In [None]:
/*********************************************
 In order to operationalize the flow, we will be creating a Dynamic Table downstream.  Dynamic Tables require change tracking to be enabled on source tables.  
*********************************************/
alter table chunks_parse_doc set change_tracking = True;

In [None]:
/***************************************
                PARSE_DOC + SPLIT_TEXT   

The SPLIT_TEXT_RECURSIVE_CHARACTER function splits a string up into smaller chunks of text recursively for 
preprocessing text as input to ML workloads. The function returns an array of text chunks, where the chunks are computed based on the input parameters provided.

Returns the extracted content from a document on a Snowflake stage as an OBJECT that contains JSON-encoded objects as strings. This function supports 2 types of extractions, Optical Character Recognition (OCR) and layout. 

If LAYOUT mode is selected, the data is markdown with structural content including tables.
If OCR, the data is text content
***************************************/
select
    relative_path
    , size
    , file_url
    , GET_PRESIGNED_URL(@docs, relative_path) as scoped_file_url   
    , SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER(
        to_variant(snowflake.cortex.parse_document(
                   @docs,
                   relative_path,
                   {'mode': 'layout'}
        )):content, 'MARKDOWN', 1800, 250) as page_content
  from cortex_search_stream
 where METADATA$ACTION = 'INSERT';


In [None]:
/***************************************
We are going to use the power of Large Language Models to easily classify the documents we are ingesting in our RAG application. We are just going to use the file name but you could also use some of the content of the doc itself. Depending on your use case you may want to use different approaches. 

We will pass the file name to the LLM using the Cortex Complete function with a prompt to classify what the guide refers to.  The prompt will be as simple as this but you can try to customize it depending on your use case and documents. Classification is not mandatory for Cortex Search but we want to use it here to also demo hybrid search.
***************************************/
select
    relative_path
    , TRIM(snowflake.cortex.COMPLETE (
            'llama3-70b',
            'Given the name of the file between <file> and </file> determine if it is related to bikes or snow. Use only one word <file> ' ||           
            relative_path || '</file>'), '\n') AS category
    from cortex_search_stream
    where METADATA$ACTION = 'INSERT';

In [None]:
/***************************************
Let's operationalize the previous two statements by putting them in a task that listens for documents to land on the stage.  When a new document lands, we will extract and chunk the contents as well as assign a category.  The extracted text is stored in a variant column.

Change the warehouse name to your warehouse
Change the database to your database

Change the warehouse
***************************************/
create or replace task cortex_search_new_docs
warehouse = mdt2_compute_wh#                                                                          --CHANGE THIS
schedule = '1 minute'
comment = 'Process new files in the stage and insert data into the chunks_parse_doc table'
when SYSTEM$STREAM_HAS_DATA('cortex_search_stream') as
insert into CHUNKS_PARSE_DOC (
    select
        relative_path
        , size
        , file_url
        , GET_PRESIGNED_URL(@docs, relative_path) as scoped_file_url 
        , SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER(
            to_variant(snowflake.cortex.parse_document(
                       @docs,  
                       relative_path,
                       {'mode': 'layout'}
            )):content, 'MARKDOWN', 1800, 250) as page_content
        , TRIM(snowflake.cortex.COMPLETE (
            'llama3-70b',
            'Given the name of the file between <file> and </file> determine if it is related to bikes or snow. Use only one word <file> ' || 
             relative_path || '</file>'), '\n') AS category
    from cortex_search_stream
    where METADATA$ACTION = 'INSERT'
    );
    
alter task cortex_search_new_docs resume;

In [None]:
/**********************
Verify that the documents have been successfully extracted
**********************/
select * from chunks_parse_doc;

In [None]:
/******************************
The next step is the parse and flatten the extracted text.
******************************/
select RELATIVE_PATH, scoped_file_url, size, file_url,
       c.value AS chunk,
       category
  from CHUNKS_PARSE_DOC D, LATERAL FLATTEN(INPUT => D.page_content) c;

In [None]:
/******************************
We can operationalize it by using a dynamic table when new documents are loaded into the CHUNKS_PARSE_DOC table.

Note that there are 18 rows on the table.

Change the name of the warehouse
******************************/
create or replace dynamic table cortex_search_chunks
target_lag = '1 minute'
warehouse = MDT2_COMPUTE_WH# as                                                                               --CHANGE THIS
  select RELATIVE_PATH, scoped_file_url, size, file_url,
         c.value AS chunk,
         category
    from CHUNKS_PARSE_DOC D, LATERAL FLATTEN(INPUT => D.page_content) c;   -- CHANGE THIS


select * from CORTEX_SEARCH_CHUNKS;

In [None]:
/******************************************
CREATE THE SEARCH SERVICE

All of the operational complexity of building the search service is abstracted into a single SQL statement for service creation. This removes the burden of creating and managing multiple processes for ingestion, embedding and serving, ultimately freeing up time to focus on developing cutting-edge AI applications.
******************************************/

CREATE OR REPLACE CORTEX SEARCH SERVICE cortex_search_svc ON chunk
ATTRIBUTES CATEGORY
WAREHOUSE = compute_wh                                                      --CHANGE THIS
TARGET_LAG = '1 minute'
AS (
SELECT
    chunk::varchar as chunk,
    relative_path,
    file_url,
    category
FROM
    CORTEX_SEARCH_CHUNKS
);

In [None]:
/******************************
Let's test the pipeline.  We will add the last 4 documents and verify that they are processed in an automated fashion

Change the name of the database
******************************/
COPY FILES
  INTO @docs
  FROM @mdt2_cortex_search_docs.public.mdt2_snowflake_extensions/branches/main/documents/RacingFast_Skis_Specification_Guide.pdf;           --CHANGE THIS
COPY FILES
  INTO @docs
  FROM @mdt2_cortex_search_docs.public.mdt2_snowflake_extensions/branches/main/documents/Ski_Boots_TDBootz_Special.pdf;                     --CHANGE THIS
COPY FILES
  INTO @docs
  FROM @mdt2_cortex_search_docs.public.mdt2_snowflake_extensions/branches/main/documents/The_Ultimate_Downhill_Bike.pdf;                    --CHANGE THIS
COPY FILES
  INTO @docs
  FROM @mdt2_cortex_search_docs.public.mdt2_snowflake_extensions/branches/main/documents/The_Xtreme_Road_Bike_105_SL.pdf;                   --CHANGE THIS
alter stage docs refresh;
ls @docs;

In [None]:
/******************************
Check the CORTEX_SEARCH_CHUNKS table after a couple of minutes to ensure that the new documents have been processed.
******************************/
select * from CORTEX_SEARCH_CHUNKS;

# Next Step:

Now that the Cortex Search service has been created, we need to create an app to access the API in order to allow users to chat with the documents.  We will do this using Streamlit.  

1. Copy the code below
2. Create a new Streamlit app
3. Edit the app and replace the Streamlit code with the code below
4. Change the DATABASE to your database
5. Add the snowflake.core package
6. Click Run

```
import streamlit as st # Import python packages
from snowflake.snowpark.context import get_active_session

from snowflake.core import Root

import pandas as pd
import json

pd.set_option("max_colwidth",None)

### Default Values
NUM_CHUNKS = 3 # Num-chunks provided as context. Play with this to check how it affects your accuracy

# service parameters
CORTEX_SEARCH_DATABASE = "MDT2_CORTEX_SEARCH_DOCS##"
CORTEX_SEARCH_SCHEMA = "DATA"
CORTEX_SEARCH_SERVICE = "CORTEX_SEARCH_SVC"
######
######

# columns to query in the service
COLUMNS = [
    "chunk",
    "relative_path",
    "category"
]

session = get_active_session()
root = Root(session)                         

svc = root.databases[CORTEX_SEARCH_DATABASE].schemas[CORTEX_SEARCH_SCHEMA].cortex_search_services[CORTEX_SEARCH_SERVICE]
   
### Functions
     
def config_options():

    st.sidebar.selectbox('Select your model:',(
                                    'mixtral-8x7b',
                                    'snowflake-arctic',
                                    'mistral-large',
                                    'llama3-8b',
                                    'llama3-70b',
                                    'reka-flash',
                                     'mistral-7b',
                                     'llama2-70b-chat',
                                     'gemma-7b'), key="model_name")

    categories = session.sql("select category from cortex_search_chunks group by category").collect()

    cat_list = ['ALL']
    for cat in categories:
        cat_list.append(cat.CATEGORY)
            
    st.sidebar.selectbox('Select what products you are looking for', cat_list, key = "category_value")

    st.sidebar.expander("Session State").write(st.session_state)

def get_similar_chunks_search_service(query):

    if st.session_state.category_value == "ALL":
        response = svc.search(query, COLUMNS, limit=NUM_CHUNKS)
    else: 
        filter_obj = {"@eq": {"category": st.session_state.category_value} }
        response = svc.search(query, COLUMNS, filter=filter_obj, limit=NUM_CHUNKS)

    st.sidebar.json(response.json())
    
    return response.json()  

def create_prompt (myquestion):

    if st.session_state.rag == 1:
        prompt_context = get_similar_chunks_search_service(myquestion)
  
        prompt = f"""
           You are an expert chat assistance that extracs information from the CONTEXT provided
           between <context> and </context> tags.
           When ansering the question contained between <question> and </question> tags
           be concise and do not hallucinate. 
           If you don´t have the information just say so.
           Only anwer the question if you can extract it from the CONTEXT provideed.
           
           Do not mention the CONTEXT used in your answer.
    
           <context>          
           {prompt_context}
           </context>
           <question>  
           {myquestion}
           </question>
           Answer: 
           """

        json_data = json.loads(prompt_context)

        relative_paths = set(item['relative_path'] for item in json_data['results'])
        
    else:     
        prompt = f"""[0]
         'Question:  
           {myquestion} 
           Answer: '
           """
        relative_paths = "None"
            
    return prompt, relative_paths

def complete(myquestion):

    prompt, relative_paths =create_prompt (myquestion)
    cmd = """
            select snowflake.cortex.complete(?, ?) as response
          """
    
    df_response = session.sql(cmd, params=[st.session_state.model_name, prompt]).collect()
    return df_response, relative_paths

def main():
    
    st.title(f":speech_balloon: Chat Document Assistant with Snowflake Cortex")
    st.write("This is the list of documents you already have and that will be used to answer your questions:")
    docs_available = session.sql("ls @docs").collect()
    list_docs = []
    for doc in docs_available:
        list_docs.append(doc["name"])
    st.dataframe(list_docs)

    config_options()

    st.session_state.rag = st.sidebar.checkbox('Use your own documents as context?')

    question = st.text_input("Enter question", placeholder="Is there any special lubricant to be used with the premium bike?", label_visibility="collapsed")

    if question:
        response, relative_paths = complete(question)
        res_text = response[0].RESPONSE
        st.markdown(res_text)

        if relative_paths != "None":
            with st.sidebar.expander("Related Documents"):
                for path in relative_paths:
                    cmd2 = f"select GET_PRESIGNED_URL(@docs, '{path}', 360) as URL_LINK from directory(@docs)"
                    df_url_link = session.sql(cmd2).to_pandas()
                    url_link = df_url_link._get_value(0,'URL_LINK')
        
                    display_url = f"Doc: [{path}]({url_link})"
                    st.sidebar.markdown(display_url)
                
if __name__ == "__main__":
    main()
```

# One more thing:

The first app that we created searched documents one question at a time with no context as to the converation history.  We can also pass the chat history to the LLM to create more a converational approach.

1. Copy the code below
2. Create a new Streamlit app
3. Edit the app and replace the Streamlit code with the code below
4. Change the DATABASE to your database
5. Add the snowflake.core package
6. Click Run

```
import streamlit as st # Import python packages
from snowflake.snowpark.context import get_active_session

from snowflake.core import Root

import pandas as pd
import json

pd.set_option("max_colwidth",None)

### Default Values
NUM_CHUNKS = 3 # Num-chunks provided as context. Play with this to check how it affects your accuracy

# service parameters
CORTEX_SEARCH_DATABASE = "MDT2_CORTEX_SEARCH_DOCS##"
CORTEX_SEARCH_SCHEMA = "DATA"
CORTEX_SEARCH_SERVICE = "CORTEX_SEARCH_SVC"
######
######

# columns to query in the service
COLUMNS = [
    "chunk",
    "relative_path",
    "category"
]

session = get_active_session()
root = Root(session)                         

svc = root.databases[CORTEX_SEARCH_DATABASE].schemas[CORTEX_SEARCH_SCHEMA].cortex_search_services[CORTEX_SEARCH_SERVICE]
   
### Functions
     
def config_options():

    st.sidebar.selectbox('Select your model:',(
                                    'mixtral-8x7b',
                                    'snowflake-arctic',
                                    'mistral-large',
                                    'llama3-8b',
                                    'llama3-70b',
                                    'reka-flash',
                                     'mistral-7b',
                                     'llama2-70b-chat',
                                     'gemma-7b'), key="model_name")

    categories = session.sql("select category from cortex_search_chunks group by category").collect()

    cat_list = ['ALL']
    for cat in categories:
        cat_list.append(cat.CATEGORY)
            
    st.sidebar.selectbox('Select what products you are looking for', cat_list, key = "category_value")

    st.sidebar.expander("Session State").write(st.session_state)

def get_similar_chunks_search_service(query):

    if st.session_state.category_value == "ALL":
        response = svc.search(query, COLUMNS, limit=NUM_CHUNKS)
    else: 
        filter_obj = {"@eq": {"category": st.session_state.category_value} }
        response = svc.search(query, COLUMNS, filter=filter_obj, limit=NUM_CHUNKS)

    st.sidebar.json(response.json())
    
    return response.json()  

def create_prompt (myquestion):

    if st.session_state.rag == 1:
        prompt_context = get_similar_chunks_search_service(myquestion)
  
        prompt = f"""
           You are an expert chat assistance that extracs information from the CONTEXT provided
           between <context> and </context> tags.
           When ansering the question contained between <question> and </question> tags
           be concise and do not hallucinate. 
           If you don´t have the information just say so.
           Only anwer the question if you can extract it from the CONTEXT provideed.
           
           Do not mention the CONTEXT used in your answer.
    
           <context>          
           {prompt_context}
           </context>
           <question>  
           {myquestion}
           </question>
           Answer: 
           """

        json_data = json.loads(prompt_context)

        relative_paths = set(item['relative_path'] for item in json_data['results'])
        
    else:     
        prompt = f"""[0]
         'Question:  
           {myquestion} 
           Answer: '
           """
        relative_paths = "None"
            
    return prompt, relative_paths

def complete(myquestion):

    prompt, relative_paths =create_prompt (myquestion)
    cmd = """
            select snowflake.cortex.complete(?, ?) as response
          """
    
    df_response = session.sql(cmd, params=[st.session_state.model_name, prompt]).collect()
    return df_response, relative_paths

def main():
    
    st.title(f":speech_balloon: Chat Document Assistant with Snowflake Cortex")
    st.write("This is the list of documents you already have and that will be used to answer your questions:")
    docs_available = session.sql("ls @docs").collect()
    list_docs = []
    for doc in docs_available:
        list_docs.append(doc["name"])
    st.dataframe(list_docs)

    config_options()

    st.session_state.rag = st.sidebar.checkbox('Use your own documents as context?')

    question = st.text_input("Enter question", placeholder="Is there any special lubricant to be used with the premium bike?", label_visibility="collapsed")

    if question:
        response, relative_paths = complete(question)
        res_text = response[0].RESPONSE
        st.markdown(res_text)

        if relative_paths != "None":
            with st.sidebar.expander("Related Documents"):
                for path in relative_paths:
                    cmd2 = f"select GET_PRESIGNED_URL(@docs, '{path}', 360) as URL_LINK from directory(@docs)"
                    df_url_link = session.sql(cmd2).to_pandas()
                    url_link = df_url_link._get_value(0,'URL_LINK')
        
                    display_url = f"Doc: [{path}]({url_link})"
                    st.sidebar.markdown(display_url)
                
if __name__ == "__main__":
    main()
```