In [19]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pandas as pd
from PyPDF2 import PdfFileReader
from io import BytesIO
from snowflake.snowpark import types as T
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType
#from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session
from snowflake.snowpark.types import StringType, StructField, StructType
import os
from dotenv import load_dotenv

# Load .env file
load_dotenv()

True

In [20]:
# Read connection parameters from environment variables
connection_parameters = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "user": os.getenv("SNOWFLAKE_USER"),
    "password": os.getenv("SNOWFLAKE_PASSWORD"),
    "role": os.getenv("SNOWFLAKE_ROLE"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "database": os.getenv("SNOWFLAKE_DATABASE"),
    "schema": os.getenv("SNOWFLAKE_SCHEMA")
}

# Create a Snowflake Session
session = Session.builder.configs(connection_parameters).create()

In [21]:
session.sql('create or replace stage RAG DIRECTORY = ( ENABLE = TRUE)').collect()

[Row(status='Stage area RAG successfully created.')]

In [4]:
session.file.put('documents/*.pdf', "@RAG", auto_compress=False, overwrite=True)

[PutResult(source='Snowflake DBX compare matrix 2023_1220.pdf', target='Snowflake DBX compare matrix 2023_1220.pdf', source_size=545622, target_size=545632, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [15]:
def readpdf(file_path):
    whole_text = ""
    with SnowflakeFile.open(file_path, 'rb') as file:
        f = BytesIO(file.readall())
        pdf_reader = PdfFileReader(f)
        whole_text = ""
        for page in pdf_reader.pages:
            whole_text += page.extract_text()
    return whole_text

In [16]:
#Register the UDF. 
session.udf.register(
    func = readpdf
  , return_type = StringType()
  , input_types = [StringType()]
  , is_permanent = True
  , name = 'SNOWPARK_PDF'
  , replace = True
  , packages=['snowflake-snowpark-python','pypdf2']
  , stage_location = 'RAG'
)

The version of package 'pypdf2' in the local environment is 3.0.1, which does not fit the criteria for the requirement 'pypdf2'. Your UDF might not work when the package version is different between the server and your local environment.


<snowflake.snowpark.udf.UserDefinedFunction at 0x149b4ce50>

In [7]:
session.sql('''
CREATE OR REPLACE TABLE RAW_TEXT AS
SELECT
    relative_path
    , file_url
    , snowpark_pdf(build_scoped_file_url(@RAG, relative_path)) as raw_text
from directory(@RAG)
            ''').collect()

[Row(status='Table RAW_TEXT successfully created.')]

In [8]:
session.table('RAW_TEXT').show()

---------------------------------------------
|"RELATIVE_PATH"  |"FILE_URL"  |"RAW_TEXT"  |
---------------------------------------------
|                 |            |            |
---------------------------------------------



In [22]:
class text_chunker:

    def process(self,text):        
        text_raw=[]
        text_raw.append(text) 
        
        text_splitter = RecursiveCharacterTextSplitter(
            separators = ["\n"], # Define an appropriate separator. New line is good typically!
            chunk_size = 500, #Adjust this as you see fit
            chunk_overlap  = 100, #This let's text have some form of overlap. Useful for keeping chunks contextual
            length_function = len,
            add_start_index = True #Optional but useful if you'd like to feed the chunk before/after
        )
    
        chunks = text_splitter.create_documents(text_raw)
        df = pd.DataFrame(chunks, columns=['chunks','meta'])
        
        yield from df.itertuples(index=False, name=None)

In [23]:
#Register the UDTF

schema = StructType([
     StructField("chunk", StringType()),
    StructField("meta", StringType()),
 ])

session.udtf.register( 
    handler = text_chunker,
    output_schema= schema, 
    input_types = [StringType()] , 
    is_permanent = True , 
    name = 'CHUNK_TEXT' , 
    replace = True , 
    packages=['pandas','langchain'], stage_location = 'RAG')

The version of package 'langchain' in the local environment is 0.1.1, which does not fit the criteria for the requirement 'langchain'. Your UDF might not work when the package version is different between the server and your local environment.


<snowflake.snowpark.udtf.UserDefinedTableFunction at 0x148cc0590>

In [24]:
session.sql('''
CREATE OR REPLACE TABLE CHUNK_TEXT AS
SELECT
        relative_path,
        func.*
    FROM raw_text AS raw,
         TABLE(chunk_text(raw_text)) as func;
            ''').collect()

[Row(status='Table CHUNK_TEXT successfully created.')]

In [12]:
session.sql('''
CREATE OR REPLACE TABLE VECTOR_STORE AS
SELECT
RELATIVE_PATH as EPISODE_NAME,
CHUNK AS CHUNK,
snowflake.cortex.embed_text('e5-base-v2', chunk) as chunk_embedding
FROM CHUNK_TEXT
            ''').collect()

[Row(status='Table VECTOR_STORE successfully created.')]