RAG Made Easy w/ Snowflake Cortex
========

Creating an end-to-end Retrieval Augmented Generation process (or RAG) directly in Snowflake.
1) Extract full text from PDF files using Snowpark.
2) Chunk those documents using Langchain in Snowpark.
3) Use Cortex to create embeddings of those chunks.
4) Use Vector Similarity to show the most similar chunk when prompting an LLM.

## 1. Imports and Session Setup

In [1]:
import pandas as pd
import json
from PyPDF2 import PdfFileReader
from snowflake.snowpark.files import SnowflakeFile
from io import BytesIO
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [2]:
from snowflake.snowpark.session import Session
snowflake_connection_cfg = json.loads(open("/Users/mitaylor/Documents/creds/creds_LLM.json").read()) 
session = Session.builder.configs(snowflake_connection_cfg).create()

In [3]:
session.sql("USE DATABASE RAG_DEMO").collect()
#session.sql("CREATE OR REPLACE STAGE RAG_FUNC_STAGE").collect()
#session.sql("CREATE OR REPLACE STAGE RAG_PDF_STAGE").collect()
session.sql("CREATE OR REPLACE WAREHOUSE ASYNC_WH WITH WAREHOUSE_SIZE='MEDIUM' WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'").collect()

[Row(status='Warehouse ASYNC_WH successfully created.')]

## 2. Check our pdfs

### (Note this was uploaded manually via the Snowsight GUI, but other options are available, e.g. loaded from an S3 bucket)

In [4]:
session.sql('''ls @RAG_PDF_STAGE''').collect()

[Row(name='rag_pdf_stage/richards-smith-2015-caffeine-consumption-and-self-assessed-stress-anxiety-and-depression-in-secondary-school-children.pdf', size=261344, md5='22742df860d590a125ccd44bdfe0f89f', last_modified='Wed, 7 Feb 2024 12:31:27 GMT')]

## 3. Read the PDF via a UDF and deposit text into the RAW_TEXT table

In [5]:
def readpdf(file_path):
    whole_text = ""
    with SnowflakeFile.open(file_path, 'rb') as file:
        f = BytesIO(file.readall())
        pdf_reader = PdfFileReader(f)
        whole_text = ""
        for page in pdf_reader.pages:
            whole_text += page.extract_text()
    return whole_text

In [6]:
session.udf.register(func = readpdf,
                     return_type = StringType(),
                     input_types = [StringType()],
                     is_permanent = True,
                     name = 'SNOWPARK_PDF',
                     replace = True,
                     packages=['snowflake-snowpark-python','pypdf2'],
                     stage_location = '@RAG_FUNC_STAGE')

<snowflake.snowpark.udf.UserDefinedFunction at 0x10da8fa90>

In [10]:
session.sql('''CREATE OR REPLACE TABLE RAW_TEXT AS
SELECT
    relative_path
    , file_url
    , snowpark_pdf(build_scoped_file_url(@RAG_PDF_STAGE, relative_path)) as raw_text
from directory(@RAG_PDF_STAGE)''').collect()

[Row(status='Table RAW_TEXT successfully created.')]

In [None]:
#This will fail due to tokens exceeding limit, which means we need to chunk!
session.sql('''SELECT
SNOWFLAKE.ML.COMPLETE('llama2-7b-chat',CONCAT('summarise the following text',raw_text)) 
FROM
RAW_TEXT
LIMIT 1''').collect()

## 4. Do Some Chunking to address the size/token limitation with a UDTF


In [13]:
#A class for chunking text and returning a table via UDTF
class text_chunker:
    def process(self,text):        
        text_raw=[]
        text_raw.append(text) 
        
        text_splitter = RecursiveCharacterTextSplitter(
            separators = ["\n"], # Define an appropriate separator. New line is good typically!
            chunk_size = 1000, #Adjust this as you see fit
            chunk_overlap  = 50, #This let's text have some form of overlap. Useful for keeping chunks contextual
            length_function = len,
            add_start_index = True #Optional but useful if you'd like to feed the chunk before/after
        )
    
        chunks = text_splitter.create_documents(text_raw)
        df = pd.DataFrame(chunks, columns=['chunks','meta'])
        yield from df.itertuples(index=False, name=None)


In [14]:
schema = StructType([StructField("chunk", StringType()),
                     StructField("meta", StringType()),])

session.udtf.register(handler = text_chunker,
                      output_schema= schema, 
                      input_types = [StringType()] , 
                      is_permanent = True , 
                      name = 'CHUNK_TEXT' , 
                      replace = True , 
                      packages=['pandas','langchain'], stage_location = '@RAG_FUNC_STAGE' )

<snowflake.snowpark.udtf.UserDefinedTableFunction at 0x17a35a1d0>

In [15]:
# Create the chunked version of the table
session.sql('''
CREATE OR REPLACE TABLE CHUNK_TEXT AS
SELECT relative_path,func.*
FROM raw_text AS raw, TABLE(chunk_text(raw_text)) as func
''').collect()

[Row(status='Table CHUNK_TEXT successfully created.')]

## 4. Convert those chunks to vectors to enable vector searching on the VECTOR_STORE Table

In [16]:
session.sql('''
CREATE OR REPLACE TABLE VECTOR_STORE AS
SELECT RELATIVE_PATH as PDF_NAME, CHUNK AS CHUNK, snowflake.ml.embed_text('e5-base-v2', chunk) as chunk_embedding
FROM CHUNK_TEXT
''').collect()

[Row(status='Table VECTOR_STORE successfully created.')]

In [18]:
session.sql('''
SELECT PDF_NAME, CHUNK from RAG_DEMO.PUBLIC.VECTOR_STORE 
ORDER BY VECTOR_L2_DISTANCE(snowflake.ml.embed_text('e5-base-v2', 'Is Caffeine Good for me?'), CHUNK_EMBEDDING
            ) limit 1''').collect()

[Row(EPISODE_NAME='richards-smith-2015-caffeine-consumption-and-self-assessed-stress-anxiety-and-depression-in-secondary-school-children.pdf', CHUNK="('page_content', '\\nweekly cycle of caffeine use in adolescents was reported by Pollak and Bright (2003), in which consumption peaked during the weekend (Saturday), and was lowest in the middle of the week (Wednesday). Coupled with the observations that adoles-cents sometimes use caffeinated products to delay sleep onset (e.g. Calamaro et al., 2009) and to counteract the effects of sleep-iness during the day (Malinauskas et al., 2007), it is possible that the timing of administration of the questionnaire may have been of importance.A further limitation of the current study is that it utilised a cross-sectional design. This means that all effects observed here are correlational, and that causation cannot be inferred. Therefore the possibility of reverse-causation, or indeed bi-directionality, cannot be disregarded. For instance, high caf-

## 5. Combine the Chunked Vector Store Data with a "Standard" LLAMA2-7b "query" via the complete function and embedding context via RAG

In [26]:
question = "'What association is there between caffeine and anxiety in school children?'"

In [28]:
answer_sql = session.sql(f'''SELECT snowflake.cortex.complete(
    'mistral-7b', 
    CONCAT( 
        'Answer the question based on the context. Be concise.','Context: ',
        (
            SELECT chunk FROM RAG_DEMO.PUBLIC.VECTOR_STORE 
            ORDER BY vector_l2_distance(
            snowflake.ml.embed_text('e5-base-v2', 
            {question}
            ), chunk_embedding
            ) LIMIT 1
        ),
        'Question: ', 
        {question},
        'Answer: '
    )
) as response ''').collect()

In [29]:
answer_sql

[Row(RESPONSE=' According to the study, there is a positive association between caffeine intake and anxiety in school children. Specifically, the study found that higher levels of total weekly caffeine intake were associated with higher levels of anxiety, both after adjusting for additional covariates and when examining the effects in males and females separately.')]