Data Ingestion

Installing the required dependencies

In [0]:
%pip install langchain unstructured "unstructured[pdf]" faiss-cpu databricks-langchain

Collecting langchain
  Downloading langchain-0.3.19-py3-none-any.whl.metadata (7.9 kB)
Collecting unstructured
  Downloading unstructured-0.16.23-py3-none-any.whl.metadata (24 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Collecting databricks-langchain
  Downloading databricks_langchain-0.3.0-py3-none-any.whl.metadata (2.6 kB)
Collecting langchain-core<1.0.0,>=0.3.35 (from langchain)
  Downloading langchain_core-0.3.40-py3-none-any.whl.metadata (5.9 kB)
Collecting langchain-text-splitters<1.0.0,>=0.3.6 (from langchain)
  Downloading langchain_text_splitters-0.3.6-py3-none-any.whl.metadata (1.9 kB)
Collecting langsmith<0.4,>=0.1.17 (from langchain)
  Downloading langsmith-0.3.11-py3-none-any.whl.metadata (14 kB)
Collecting pydantic<3.0.0,>=2.7.4 (from langchain)
  Downloading pydantic-2.10.6-py3-none-any.whl.metadata (30 kB)
Collecting SQLAlchemy<3,>=1.4 (from langchain)
  Downloading SQLAlchemy-2.0.38-cp311-cp311-manyli

In [0]:
dbutils.library.restartPython()

Import relevant dependencies

In [0]:
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.docstore.in_memory import InMemoryDocstore 
from langchain_community.vectorstores import FAISS
from databricks_langchain import DatabricksEmbeddings

import os
import faiss

Setting up Initial Configuration

In [0]:
if 'config' not in locals(): config = {}

config['DOCS_DIR'] = f"/Volumes/practice/default/datasets/pdf"
config['VECTOR_STORE_PATH'] = f"/Volumes/practice/default/rag_t2_vector_store"

embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")

In [0]:
%sql
USE CATALOG practice;
USE SCHEMA default;

CREATE TABLE IF NOT EXISTS rag_t2_files (
  id BIGINT GENERATED ALWAYS AS IDENTITY,
  file_name VARCHAR(128),
  file_type VARCHAR(32),
  file_size VARCHAR(64),
  file_path VARCHAR(256),
  timestamp TIMESTAMP
) USING DELTA;

Function Definition

Supporting Functions

In [0]:
def format_file_size(size_bytes):
    units = ["B", "KB", "MB", "GB"]
    size = float(size_bytes)
    unit_index = 0

    while size >= 1024 and unit_index < len(units) - 1:
        size /= 1024
        unit_index += 1
    
    return f"{size:.2f} {units[unit_index]}"

In [0]:
def get_document_details(document):
    """
    Given a document, extracts details like name, type, size, path

    Args:
        document: A LangChain document object.
    
    Returns:
        list: A list of details about the document
    """

    file_name = os.path.basename(document.metadata['source'])
    file_type = os.path.splitext(file_name)[1][1:]
    file_size = format_file_size(os.path.getsize(document.metadata['source']))

    return [file_name, file_type, file_size]


In [0]:
def get_new_docs(documents):
    """
    To avoid adding duplicate documents
   
    Args:
        documents: A list of new and repeat documents
   
    Returns:
        list: Documents not already in db.
    """

    new_documents = []

    for document in documents:
        details = get_document_details(document)
        query = f"""SELECT file_name FROM rag_t2_files WHERE file_name='{details[0]}' AND file_type='{details[1]}';"""

        result = spark.sql(query)
        # print("Checking for duplicate result:", result.show())

        if result.isEmpty():
            query = f"""
            INSERT INTO rag_t2_files (file_name, file_type, file_size, file_path, timestamp) 
            VALUES ('{details[0]}', '{details[1]}', '{details[2]}', '{document.metadata['source']}', NOW());
            """
            result = spark.sql(query)
        
        new_documents.append(document)

    return new_documents

Wrapping Functions

In [0]:
def load_docs():
    """
    Reads all files in the given directory.
   
    Args:
   
    Returns:
        list: Loaded LangChain document objects which are not present in db.
    """

    documents = []
    file_types=("*.txt", "*.html", "*.pdf")

    for file_type in file_types:
        loader = DirectoryLoader(config['DOCS_DIR'], glob=file_type, show_progress=True)
        documents.extend(loader.load())

    documents = get_new_docs(documents)

    print(f"\nINFO - Documents Loaded, ({len(documents)} files):")
    # for doc in documents:
    #     print(f"- {os.path.basename(doc.metadata['source'])}", type(doc), doc)

    return documents

In [0]:
def split_docs(docs, chunk_size=1000, chunk_overlap=300, add_start_index=True):
    """
    Splits text from documents into chunks and allows some overlap for continuity between chunks.
   
    Args:
        documents (list): List of LangChain document objects to be split.
        chunk_size (int): The maximum size (in characters) of each chunk (default: 1000).
        chunk_overlap (int): The number of overlapping characters between chunks (default: 20).
        add_start_index(bool): This tracks the starting index of each chunk within the original document, can be useful for reconstructing or referencing chunks later.
   
    Returns:
        list: A list of LangChain document chunks after splitting.
    """
    if not docs:
        raise ValueError("The input documents list is empty.")
   
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        add_start_index=add_start_index,
    )

    docs_chunks = text_splitter.split_documents(docs)
    # print(f"\nINFO - Documents loaded: {len(documents)}, Total Splits: {len(docs_chunks)}")

    return docs_chunks

In [0]:
def load_vector_store():
    """
    Loads vector store if present or else creates one.

    Args:
   
    Returns:
        A vector_store.
    """
    if os.path.exists(f"{config['VECTOR_STORE_PATH']}/index.faiss"):
        try:
            vector_store = FAISS.load_local(
                folder_path=config['VECTOR_STORE_PATH'],
                embeddings=embeddings,
                allow_dangerous_deserialization=True  # Fix for pickle loading
            )

            print("Loaded vector_store")

            return vector_store
        except Exception as e:
            print(f"Error loading vector_store: {e}")
        
            return None
    else:
        index = faiss.IndexFlatL2(len(embeddings.embed_query("Hello World !")))
        vector_store = FAISS(
            embedding_function = embeddings,
            index = index,
            docstore = InMemoryDocstore(),
            index_to_docstore_id = {},
        )

        print("Created a new vector_store")

        return vector_store


In [0]:
def embed_store(docs_chunks, vector_store):
    """
    Embeds the document chunks and store them

    Args:
        docs_chunks (list): List of LangChain document chunks to be embedded.
        vector_store (vector_store): The vector store object containing embedded chunks.

    Returns:
        
    """
    vector_store.add_documents(docs_chunks)

    vector_store.save_local(config['VECTOR_STORE_PATH'])

    print("\n INFO - vector_store saved")

Main

In [0]:
print("\nStep: Loading the documents")
documents = load_docs() ## loads docs from directory


Step: Loading the documents


0it [00:00, ?it/s]0it [00:00, ?it/s]
0it [00:00, ?it/s]0it [00:00, ?it/s]
  0%|          | 0/1 [00:00<?, ?it/s]100%|██████████| 1/1 [00:00<00:00,  3.00it/s]100%|██████████| 1/1 [00:00<00:00,  2.99it/s]


+-----------------+-----------------+
|num_affected_rows|num_inserted_rows|
+-----------------+-----------------+
|                1|                1|
+-----------------+-----------------+

Inserting value result: None
No duplicates

INFO - Documents Loaded, (1 files):


In [0]:
if documents:
    print("\nStep: Chunking the documents")
    docs_chunks = split_docs(documents, chunk_overlap=100) ## split docs in chunks
else:
    print("\nNo new files")


Step: Chunking the documents


In [0]:
print("Step: Loading the vector store")
vector_store = load_vector_store()

Step: Loading the vector store
Created a new vector_store


In [0]:
if documents:
    embed_store(docs_chunks, vector_store)


 INFO - vector_store saved


Scratch Cell

In [0]:
%sql
SELECT * FROM rag_t2_files;

id,file_name,file_type,file_size,file_path,timestamp
1,electric_vehicles.pdf,pdf,96.72 KB,/Volumes/practice/default/datasets/pdf/electric_vehicles.pdf,2025-02-28T18:37:06.452Z
