In [None]:
#Import python packages & establish session
import pandas as pd
from PyPDF2 import PdfFileReader
from snowflake.snowpark.files import SnowflakeFile
from io import BytesIO

from snowflake.snowpark.types import StringType, StructField, StructType, IntegerType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.context import get_active_session

session = get_active_session()


In [None]:
ls @pdfs

Let's define a Python function ```readpdf``` that reads and extracts text from a PDF file. This function is then registered as a UDTF in Snowflake to provide the scalability to process multiple PDFs in parallel across the nodes of a WH 

In [None]:
def readpdf(file_path):
    whole_text = ""
    with SnowflakeFile.open(file_path, 'rb') as file:
        f = BytesIO(file.readall())
        pdf_reader = PdfFileReader(f)
        whole_text = ""
        for page in pdf_reader.pages:
            whole_text += page.extract_text()
    return whole_text

#Register the UDF. 
session.udf.register(
    func = readpdf
  , return_type = StringType()
  , input_types = [StringType()]
  , is_permanent = True
  , name = 'readpdf'
  , replace = True
  , packages=['snowflake-snowpark-python','pypdf2']
  , stage_location = 'pdfs'
)

In [None]:

SELECT 
    relative_path, 
    file_url, 
    readpdf(build_scoped_file_url(@pdfs, relative_path)) as raw_text
from directory(@pdfs);

Let's split the text extracted from our PDFs into chunks (contenxtuallly relevant pieces). Think of it like converting large unstructured text into a knowledge base of answers. Converting, cleaning, and checking large documents can be difficult to get right. We will try 2 approaches: fixed chunking and a sentence based chunking:

- Fixed-size chunking is simpler to implement and computationally efficient, but not respect natural language boundaries, leading to potential loss of semantic context between chunks
- Sentence based chunking helps maintaining context and meaning. Results in chunks of varying sizes, which might not be ideal for models that require uniform input dimensions. Requires more sophisticated processing






In [None]:
import io
import re
import json
import pandas as pd
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2
import langchain

class fixed_text_chunker:
    
    def read_pdf(self, file_url: str) -> str:
        whole_text = ""
        with SnowflakeFile.open(file_url, 'rb') as file:
            f = BytesIO(file.readall())
            pdf_reader = PdfFileReader(f)
            whole_text = ""
            for page in pdf_reader.pages:
                whole_text += page.extract_text()
        return whole_text

    def process(self, file_url: str, chunk_size:int, chunk_overlap: int): 
        text_raw=[]
        text_raw.append(self.read_pdf(file_url)) 
        # Read the PDF and get the combined text and page-specific texts
        
        text_splitter = RecursiveCharacterTextSplitter(
            separators = ["\n"], # Define an appropriate separator. New line is good typically!
            chunk_size = chunk_size, #Adjust this as you see fit
            chunk_overlap  = chunk_overlap, #This let's text have some form of overlap. Useful for keeping chunks contextual
            length_function = len,
            add_start_index = True #Optional but useful if you'd like to feed the chunk before/after
        )
        
    
        chunks = text_splitter.create_documents(text_raw)
        df = pd.DataFrame([[d.page_content, d.metadata] for d in chunks], columns=['chunks','meta'])
        
        yield from df.itertuples(index=False, name=None)

#Register the UDTF - set the stage location

schema = StructType([
     StructField("chunk", StringType()),
     StructField("meta", StringType()),
 ])

session.udtf.register( 
    handler = fixed_text_chunker,
    output_schema= schema, 
    input_types = [StringType(), IntegerType(), IntegerType()] , 
    is_permanent = True , 
    name = 'fixed_text_chunker' , 
    replace = True , 
    packages=['pandas','langchain','snowflake-snowpark-python','PyPDF2'], stage_location = 'pdfs' )

In [None]:
select * from table(fixed_text_chunker(build_scoped_file_url( @pdfs , 'Example data for Snowflake.pdf'), 10000, 1000));

In [None]:
--Create the chunked version of the table
CREATE OR REPLACE TABLE FIXED_CHUNK_TABLE AS
SELECT 
    relative_path,
    size,
    build_scoped_file_url(@pdfs, relative_path) AS file_url,
    func.chunk as chunk,
    func.meta as starting_index,
    'English' AS language
FROM
directory(@pdfs),
TABLE(fixed_text_chunker(build_scoped_file_url(@pdfs, relative_path),10000, 1000)) AS func;


In [None]:
SELECT len(chunk) as ChunkSize, ROW_NUMBER() OVER (ORDER BY ChunkSize) AS Chunk FROM FIXED_CHUNK_TABLE

In [None]:
# Import python packages
import streamlit as st
import pandas as pd


my_df = fixed_chunk_size.to_pandas()
# Chart the data
st.subheader("Length of Chunks")
st.bar_chart(my_df, x='CHUNK', y='CHUNKSIZE')

The ```pdf_sentence_chunker``` class is designed to read a PDF file from Snowflake storage, extract text from each page, split the text into sentences, and then combine these sentences into larger chunks while keeping track of the page numbers. The final output is a series of tuples containing the combined sentence and a JSON string of the page numbers, which are yielded one by one. This can be useful for processing large texts where context needs to be preserved across sentence boundaries.

In [None]:
import io
import re
import json
import pandas as pd
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2
from unicodedata import normalize

class pdf_sentence_chunker:

    def read_pdf(self, file_url: str) -> tuple:
        # Open the PDF file from Snowflake storage and read its contents into a buffer
        with SnowflakeFile.open(file_url, 'rb') as f:
            buffer = io.BytesIO(f.readall())
            
        # Initialize the PDF reader with the buffer content
        reader = PyPDF2.PdfReader(buffer)   
        text = ""
        page_texts = []
        
        # Iterate through each page of the PDF
        for page_num, page in enumerate(reader.pages):
            try:
                # Extract text from the current page
                extracted_text = page.extract_text()
                if extracted_text:
                    # Replace newlines and null characters with spaces
                    page_text = extracted_text.replace('\n', ' ').replace('\0', ' ')
                    text += page_text
                    page_texts.append((page_num + 1, page_text))
            except Exception as e:
                # Handle any exceptions that occur during text extraction
                text = "Unable to Extract"
        
        # Return the combined text and a list of page-specific texts
        return text, page_texts
    
    def combine_sentences(self, sentences, max_length: int, buffer_size=2):
        combined_sentences = []
        i = 0
    
        # Iterate over the sentences to combine them into larger chunks
        while i < len(sentences):
            combined_sentence = ''
            start = max(0, i - buffer_size)
            end = i
            page_numbers = set()
    
            # Combine sentences until the max_length is reached
            while end < len(sentences):
                if len(combined_sentence) + len(sentences[end]['sentence']) + 1 > max_length:
                    # Add the current sentence even if it exceeds max_length if combined_sentence is empty
                    if not combined_sentence:
                        combined_sentence += sentences[end]['sentence'] + ' '
                        page_numbers.add(sentences[end]['page'])
                        end += 1
                    break
                combined_sentence += sentences[end]['sentence'] + ' '
                page_numbers.add(sentences[end]['page'])
                end += 1
    
            # Append the combined sentence and associated page numbers to the result list
            combined_sentences.append((combined_sentence.strip(), json.dumps(sorted(page_numbers))))
            i = end
    
        # Return the list of combined sentences and their page numbers
        return combined_sentences

    def process(self, file_url: str, max_length: int):
        # Read the PDF and get the combined text and page-specific texts
        text, page_texts = self.read_pdf(file_url)
        
        # Decode the text if it is in bytes format
        if isinstance(text, bytes):
            text = text.decode('utf-8')

        sentences = []
        # Split the text of each page into individual sentences and collect them with their page numbers
        for page_num, page_text in page_texts:
            single_sentences_list = re.split(r'(?<=[.?!])\s+', page_text)
            sentences.extend([{'index': i, 'sentence': sent, 'page': page_num} for i, sent in enumerate(single_sentences_list)])

        # Combine sentences into larger chunks
        combined_sentences = self.combine_sentences(sentences, max_length)

        # Create a DataFrame from the combined sentences
        df = pd.DataFrame(combined_sentences, columns=['combined_sentence', 'page_numbers'])
        
        # Yield each row of the DataFrame as a tuple
        yield from df.itertuples(index=False, name=None)

#Register the UDTF - set the stage location
schema = StructType([
     StructField("sentences", StringType()),
     StructField("page_number", StringType()),
 ])

session.udtf.register( 
    handler = pdf_sentence_chunker,
    output_schema= schema, 
    input_types = [StringType(), IntegerType()] , 
    is_permanent = True , 
    name = 'pdf_sentence_chunker' , 
    replace = True , 
    packages=['snowflake-snowpark-python','PyPDF2', 'PyCryptodome'], stage_location = 'pdfs' )


In [None]:
select * from table(pdf_sentence_chunker(build_scoped_file_url( @pdfs , 'Example data for Snowflake.pdf'), 10000));

In [None]:
--Create the chunked version of the table
CREATE OR REPLACE TABLE SENTENCES_TABLE AS
SELECT 
    relative_path,
    size,
    build_scoped_file_url(@pdfs, relative_path) AS file_url,
    func.sentences as chunk,
    func.page_number as page_number,
    'English' AS language
FROM
directory(@pdfs),
TABLE(pdf_sentence_chunker(build_scoped_file_url(@pdfs, relative_path), 10000)) AS func;


In [None]:
SELECT len(chunk) as ChunkSize, ROW_NUMBER() OVER (ORDER BY ChunkSize) AS Chunk FROM SENTENCES_TABLE

In [None]:
SELECT * FROM SENTENCES_TABLE LIMIT 5;

In [None]:
# Import python packages
import streamlit as st
import pandas as pd


my_df = sentences_chunk_size.to_pandas()
# Chart the data
st.subheader("Length of Chunks")
st.bar_chart(my_df, x='CHUNK', y='CHUNKSIZE')

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE 
fixed_chunk_search
    ON chunk
    ATTRIBUTES language
    WAREHOUSE = compute_wh
    TARGET_LAG = '1 hour'
    AS (
    SELECT
        chunk,
        file_url,
        relative_path,
        language
    FROM FIXED_CHUNK_TABLE
    );

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE 
sentence_chunk_search
    ON chunk
    ATTRIBUTES language
    WAREHOUSE = compute_wh
    TARGET_LAG = '1 hour'
    AS (
    SELECT
        chunk,
        page_number,
        file_url,
        relative_path,
        language
    FROM SENTENCES_TABLE
    );

In [None]:
from snowflake.core import Root

root = Root(session)

pdf_service = (root
  .databases["ALLEGRO_HACKATHON"]
  .schemas["PUBLIC"]
  .cortex_search_services["sentence_chunk_search"]
)

resp = pdf_service.search(
  query="In which table I can find the tenor quantity of credit contract?",
  columns=["chunk", "page_number"],
  filter={"@eq": {"language": "English"} },
  limit=3
)
print(resp.to_json())

In [None]:


resp = pdf_service.search(
  query="What columns does the BALANCE_CHANGED table contain?",
  columns=["chunk", "page_number"],
  filter={"@eq": {"language": "English"} },
  limit=3
)
print(resp.to_json())