RAGs to AI Riches
========

Creating an end-to-end Retrieval Augmented Generation process (or RAG) directly in Snowflake.
1) Extract full text from PDF files using Snowpark.
2) Chunk those documents using Langchain in Snowpark.
3) Use a service hosted on SPCS to create embeddings of those chunks (seperate workbook to set up service).
4) Use Vector Similarity to show the most similar chunk when prompting an LLM.
5) Call a DeepSeek LLM hoset on SPCS for context

![Alt text](https://filedn.eu/ljfsfeYp02Sjg88j4jWtPqL/images/ai_llms.webp "slide1" )  


In [None]:
USE ROLE DEEPSEEK_ROLE

In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!

# Snowpark
from snowflake.snowpark.context import get_active_session
import snowflake.snowpark.functions as F
session = get_active_session()


In [None]:
create or replace function pdf_text_chunker(file_url string)
returns table (chunk varchar)
language python
runtime_version = '3.9'
handler = 'pdf_text_chunker'
packages = ('snowflake-snowpark-python','PyPDF2', 'langchain')
as
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd

class pdf_text_chunker:

    def read_pdf(self, file_url: str) -> str:
    
        logger = logging.getLogger("udf_logger")
        logger.info(f"Opening file {file_url}")
    
        with SnowflakeFile.open(file_url, 'rb') as f:
            buffer = io.BytesIO(f.readall())
            
        reader = PyPDF2.PdfReader(buffer)   
        text = ""
        for page in reader.pages:
            try:
                text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
            except:
                text = "Unable to Extract"
                logger.warn(f"Unable to extract from file {file_url}, page {page}")
        
        return text

    def process(self,file_url: str):

        text = self.read_pdf(file_url)
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size = 4000, #Adjust this as you see fit
            chunk_overlap  = 400, #This let's text have some form of overlap. Useful for keeping chunks contextual
            length_function = len
        )
    
        chunks = text_splitter.split_text(text)
        df = pd.DataFrame(chunks, columns=['chunks'])
        
        yield from df.itertuples(index=False, name=None)
$$;

In [None]:
-- add pdfs to the stage

ls @docs;

A note on chunking
-----
Chunking is the process of splitting a large body of text into smaller 'chunks' whilst attempting to keep as much relevant information as possible. Make the chunks too small and you run the risk of removing key information that the model requires to answer the question. Too large and it may be harder to retreive the correct body of text from the vector search - or spend tokens excessively.

There are many strategies towards chunking. Eg - pass the most relevant, top n relevant chunks, or pass the most relevent chunk + the chunk either side of that one. Play around and see what works for your use case!

In [None]:
-- create chunks

create or replace TABLE DOCS_CHUNKS_TABLE ( 
    RELATIVE_PATH VARCHAR(16777216), -- Relative path to the PDF file
    SIZE NUMBER(38,0), -- Size of the PDF
    FILE_URL VARCHAR(16777216), -- URL for the PDF
    SCOPED_FILE_URL VARCHAR(16777216), -- Scoped url (you can choose which one to keep depending on your use case)
    CHUNK VARCHAR(16777216));  -- Embedding using the VECTOR data type


In [None]:
from snowflake.snowpark.functions import col, lit, sql_expr

# --- Load file listing from stage ---
docs_stage = "@docs"
directory_df = session.sql(f"SELECT * FROM directory('{docs_stage}')").with_column(
    "scoped_file_url",
    sql_expr(f"build_scoped_file_url('{docs_stage}', RELATIVE_PATH)")
)
directory_df

In [None]:
chunk_df = directory_df.select(
    col("relative_path"),
    col("size"),
    col("file_url").alias("file_url"),
    col("scoped_file_url")
).join_table_function(
    "pdf_text_chunker",
    col("scoped_file_url")
)

chunk_df = chunk_df.with_column("CONTEXT", F.col("chunk"))

chunk_df.write.mode('overwrite').save_as_table('RAG_CHUNKED_TABLE')

In [None]:
chunk_df = session.table("RAG_CHUNKED_TABLE")

In [None]:
SELECT * FROM RAG_CHUNKED_TABLE LIMIT 10;

Now we check the embedding service running on SPCS

In [None]:
-- Run this to check whether status = RUNNING
SHOW SERVICES IN COMPUTE POOL GPU_NV_S_COMPUTE_POOL;

In [None]:
from snowflake.ml.registry import Registry

# Create Model Registry
reg = Registry(
    session=session, 
    database_name=session.get_current_database(), 
    schema_name="EMBEDDING_MODEL_HOL_SCHEMA"
    )

In [None]:
reg.show_models()

In [None]:
mv = reg.get_model('sentence_transformer_minilm').version('V1')

In [None]:
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T

# --- Embed the chunks using the model registry function ---
# If your model is deployed via container service:
embedded_chunk_df = mv.run(
    chunk_df, 
    function_name="encode",
    service_name = 'minilm_gpu_service'
)

In [None]:
embedded_chunk_df

In [None]:
embedded_chunk_df = embedded_chunk_df.with_column('"output_feature_0"', F.col('"output_feature_0"').cast(T.VectorType(float, 384)))
embedded_chunk_df = embedded_chunk_df.rename(F.col('"output_feature_0"'), "CHUNK_VEC")

In [None]:
embedded_chunk_df

In [None]:
embedded_chunk_df.write.mode('overwrite').save_as_table('RAG_CHUNKED_EMBEDDED_TABLE')

In [None]:
SELECT * FROM RAG_CHUNKED_EMBEDDED_TABLE LIMIT 10;

In [None]:
from snowflake.snowpark import Row
from snowflake.snowpark.functions import col
from snowflake.snowpark.types import StructType, StructField, StringType

user_input = "Tell me about the Xtreme road bike"

# Build one-row DF with required input column
input_df = session.create_dataframe(
    [Row(CONTEXT=user_input)],
    schema=StructType([StructField("CONTEXT", StringType())])
)

prompt_embedded_chunk_df = mv.run(
    input_df,
    function_name="encode",
    service_name="minilm_gpu_service"
)

In [None]:
prompt_embedded_chunk_df = prompt_embedded_chunk_df.with_column('"output_feature_0"', F.col('"output_feature_0"').cast(T.VectorType(float, 384)))
prompt_embedded_chunk_df = prompt_embedded_chunk_df.rename(F.col('"output_feature_0"'), "CHUNK_VEC")

In [None]:
prompt_embedded_chunk_df.write.mode('overwrite').save_as_table('QUERY_TABLE')

In [None]:
SELECT * FROM QUERY_TABLE;

In [None]:
-- https://docs.snowflake.com/en/user-guide/snowflake-cortex/vector-embeddings#retrieval-augmented-generation-rag

SELECT
    r.chunk,
    VECTOR_COSINE_SIMILARITY(r.chunk_vec, q.chunk_vec) AS similarity
FROM DEEPSEEK_DB.RAGTOAI.RAG_CHUNKED_EMBEDDED_TABLE r, query_table q
ORDER BY similarity DESC
LIMIT 1;

In [None]:
chunk = vector_similarity.to_df().collect()[0]["CHUNK"]

In the next cell we take the chunk and provide it to the DeepSeek model wrapped in a service function. The service function does not support streaming, so it will think about the answer, and return it all at once

In [None]:
from snowflake.snowpark.functions import call_udf

context_q = f"""
Answer the question based on the context. Be concise.
Context: {chunk}
Question: {user_input}
Answer:
"""

df = session.create_dataframe([[context_q]], schema=["prompt"])

result_df = df.select(call_udf("DEEPSEEK_DB.PUBLIC.DEEPSEEK_CHAT_UDF", df["prompt"]).alias("response"))

result = result_df.collect()[0]["RESPONSE"]

result

Next we take all the steps above and put it in a streamlit app

In [None]:
from snowflake.snowpark import Row
from snowflake.snowpark.functions import col
from snowflake.snowpark.types import StructType, StructField, StringType
import streamlit as st # Import python packages
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import call_udf
session = get_active_session() # Get the current credentials

st.title(":snowflake: Ask Your Data Anything :snowflake:")
st.write("""Built using end-to-end RAG in Snowflake with Cortex functions.""")

prompt = st.text_input("Enter prompt", placeholder="What is being discussed in this document?", label_visibility="collapsed")

if prompt:
    # Build one-row DF with required input column
    input_df = session.create_dataframe(
        [Row(CONTEXT=prompt)],
        schema=StructType([StructField("CONTEXT", StringType())])
    )
    
    prompt_embedded_chunk_df = mv.run(
        input_df,
        function_name="encode",
        service_name="minilm_gpu_service"
    )
    
    prompt_embedded_chunk_df = prompt_embedded_chunk_df.with_column('"output_feature_0"', F.col('"output_feature_0"').cast(T.VectorType(float, 384)))
    
    prompt_embedded_chunk_df = prompt_embedded_chunk_df.rename(F.col('"output_feature_0"'), "CHUNK_VEC")
    
    prompt_embedded_chunk_df.write.mode('overwrite').save_as_table('QUERY_TABLE')
    
    closest_vector_q = '''SELECT
        r.chunk,
        VECTOR_COSINE_SIMILARITY(r.chunk_vec, q.chunk_vec) AS similarity
    FROM DEEPSEEK_DB.RAGTOAI.RAG_CHUNKED_EMBEDDED_TABLE r, query_table q
    ORDER BY similarity DESC
    LIMIT 1'''
    
    closest_vector = session.sql(closest_vector_q).to_pandas()
    chunk = closest_vector['CHUNK'].iloc[0]

    context_q = f"""
    Answer the question based on the context. Be concise.
    Context: {chunk}
    Question: {prompt}
    Answer:
    """

    df = session.create_dataframe([[context_q]], schema=["prompt"])

    result_df = df.select(call_udf("DEEPSEEK_DB.PUBLIC.DEEPSEEK_CHAT_UDF", df["prompt"]).alias("response"))

    result = result_df.collect()[0]["RESPONSE"]

    st.write(result)
