diff --git a/README.md b/README.md index f075942..ca09d3f 100644 --- a/README.md +++ b/README.md @@ -1,171 +1,124 @@ -# vector-embedder +# ๐Ÿ“š vector-embedder [![Docker Repository on Quay](https://quay.io/repository/dminnear/vector-embedder/status "Docker Repository on Quay")](https://quay.io/repository/dminnear/vector-embedder) -**vector-embedder** is a flexible, language-agnostic document ingestion pipeline that generates and stores vector embeddings from structured and unstructured content. +**vector-embedder** is a flexible, language-agnostic document ingestion and embedding pipeline. It transforms structured and unstructured content from multiple sources into vector embeddings and stores them in your vector database of choice. -It supports embedding content from Git repositories (via glob patterns), web URLs, and various file types into multiple vector database backends. It runs locally, in containers, or as a Kubernetes/OpenShift job. +It supports Git repositories, web URLs, and file types like Markdown, PDFs, and HTML. Designed for local runs, containers, or OpenShift/Kubernetes jobs. --- -## ๐Ÿ“ฆ Features +## โš™๏ธ Features -- โœ… **Multiple vector DB backends supported**: +- โœ… **Multi-DB support**: - Redis (RediSearch) - Elasticsearch - PGVector (PostgreSQL) - SQL Server (preview) - Qdrant - - Dry Run (prints to console, no DB required) + - Dry Run (no DB required; logs to console) - โœ… **Flexible input sources**: - Git repositories via glob patterns (`**/*.pdf`, `*.md`, etc.) - Web pages via configurable URL lists -- โœ… **Smart document chunking** with configurable `CHUNK_SIZE` and `CHUNK_OVERLAP` -- โœ… Embedding powered by [`sentence-transformers`](https://www.sbert.net/) -- โœ… Parsing powered by LangChain and [Unstructured](https://unstructured.io/) -- โœ… Fully configurable via `.env` or runtime env vars -- โœ… Containerized using UBI and OpenShift-compatible images +- โœ… **Smart chunking** with configurable `CHUNK_SIZE` and `CHUNK_OVERLAP` +- โœ… Embeddings via [`sentence-transformers`](https://www.sbert.net/) +- โœ… Parsing via [LangChain](https://github.com/langchain-ai/langchain) + [Unstructured](https://unstructured.io/) +- โœ… UBI-compatible container, OpenShift-ready +- โœ… Fully configurable via `.env` or `-e` environment flags --- -## ๐Ÿš€ Usage +## ๐Ÿš€ Quick Start -### Configuration +### 1. Configuration -All settings are read from a `.env` file at the project root. You can override values using `export` or `-e` flags in containers. - -Example `.env`: +Set your configuration in a `.env` file at the project root. ```dotenv -# === File System Config === +# Temporary working directory TEMP_DIR=/tmp -# === Logging === +# Logging LOG_LEVEL=info -# === Git Repo Document Sources === -REPO_SOURCES=[{"repo": "https://github.com/RHEcosystemAppEng/llm-on-openshift.git", "globs": ["examples/notebooks/langchain/rhods-doc/*.pdf"]}] - -# === Web Document Sources === -WEB_SOURCES=["https://ai-on-openshift.io/getting-started/openshift/", "https://ai-on-openshift.io/getting-started/opendatahub/"] - -# === Embedding Config === -CHUNK_SIZE=1024 -CHUNK_OVERLAP=40 -DB_TYPE=DRY_RUN - -# === Redis === -REDIS_URL=redis://localhost:6379 -REDIS_INDEX=docs -REDIS_SCHEMA=redis_schema.yaml - -# === Elasticsearch === -ELASTIC_URL=http://localhost:9200 -ELASTIC_INDEX=docs -ELASTIC_USER=elastic -ELASTIC_PASSWORD=changeme +# Sources +REPO_SOURCES=[{"repo": "https://github.com/example/repo.git", "globs": ["docs/**/*.md"]}] +WEB_SOURCES=["https://example.com/docs/", "https://example.com/report.pdf"] -# === PGVector === -PGVECTOR_URL=postgresql://user:pass@localhost:5432/mydb -PGVECTOR_COLLECTION_NAME=documents +# Chunking +CHUNK_SIZE=2048 +CHUNK_OVERLAP=200 -# === SQL Server === -SQLSERVER_HOST=localhost -SQLSERVER_PORT=1433 -SQLSERVER_USER=sa -SQLSERVER_PASSWORD=StrongPassword! -SQLSERVER_DB=docs -SQLSERVER_TABLE=vector_table -SQLSERVER_DRIVER=ODBC Driver 18 for SQL Server +# Embeddings +EMBEDDING_MODEL=sentence-transformers/all-mpnet-base-v2 -# === Qdrant === -QDRANT_URL=http://localhost:6333 -QDRANT_COLLECTION=embedded_docs +# Vector DB +DB_TYPE=DRYRUN ``` -> ๐Ÿ’ก Default `DB_TYPE=DRY_RUN` skips DB upload and prints chunked docs to stdout โ€” great for testing! +๐Ÿงช `DB_TYPE=DRYRUN` logs chunks to stdout and skips database indexingโ€”great for development! ---- - -### ๐Ÿ” Dry Run Mode - -Dry run mode helps you test loaders and document chunking without needing any database. - -```dotenv -DB_TYPE=DRY_RUN -``` - -Dry run will: - -- Load from web and Git sources -- Chunk content -- Print chunk metadata and contents to stdout - -Run with: +### 2. Run Locally ```bash ./embed_documents.py ``` -or inside a container: +### 3. Or Run in a Container ```bash +podman build -t embed-job . + podman run --rm --env-file .env embed-job ``` ---- - -### ๐Ÿ› ๏ธ Build the Container +You can also pass inline vars: ```bash -podman build -t embed-job . +podman run --rm \ + -e DB_TYPE=REDIS \ + -e REDIS_URL=redis://localhost:6379 \ + embed-job ``` --- -### ๐Ÿงช Run in a Container +## ๐Ÿงช Dry Run Mode -With inline env vars: +Dry run skips vector DB upload and prints chunk metadata and content to the terminal. -```bash -podman run --rm \ - -e DB_TYPE=REDIS \ - -e REDIS_URL=redis://localhost:6379 \ - embed-job +```dotenv +DB_TYPE=DRYRUN ``` -Or using `.env`: +Run it: ```bash -podman run --rm \ - --env-file .env \ - embed-job +./embed_documents.py ``` -In OpenShift or Kubernetes, mount the `.env` via `ConfigMap` or use `env` blocks. - --- -## ๐Ÿ“‚ Project Structure +## ๐Ÿ—‚๏ธ Project Layout ``` . -โ”œโ”€โ”€ embed_documents.py # Main entrypoint -โ”œโ”€โ”€ config.py # Loads config from .env -โ”œโ”€โ”€ loaders/ # Git, web, PDF, and text file loaders -โ”œโ”€โ”€ vector_db/ # DB provider implementations +โ”œโ”€โ”€ embed_documents.py # Main entrypoint script +โ”œโ”€โ”€ config.py # Config loader from env +โ”œโ”€โ”€ loaders/ # Git, web, PDF, and text loaders +โ”œโ”€โ”€ vector_db/ # Pluggable DB providers โ”œโ”€โ”€ requirements.txt # Python dependencies -โ”œโ”€โ”€ redis_schema.yaml # Schema definition for Redis vector DB -โ””โ”€โ”€ .env # Default config (example provided) +โ”œโ”€โ”€ redis_schema.yaml # Redis index schema (if used) +โ””โ”€โ”€ .env # Default runtime config ``` --- -## ๐Ÿงช Local Testing Backends +## ๐Ÿงช Local DB Testing -Use Podman to spin up local test databases for fast experimentation. +Run a compatible DB locally to test full ingestion + indexing. -### ๐Ÿ˜ PGVector (PostgreSQL) +### PGVector (PostgreSQL) ```bash podman run --rm -d \ @@ -183,7 +136,7 @@ DB_TYPE=PGVECTOR ./embed_documents.py --- -### ๐Ÿ” Elasticsearch +### Elasticsearch ```bash podman run --rm -d \ @@ -202,7 +155,7 @@ DB_TYPE=ELASTIC ./embed_documents.py --- -### ๐Ÿง  Redis (RediSearch) +### Redis (RediSearch) ```bash podman run --rm -d \ @@ -217,7 +170,7 @@ DB_TYPE=REDIS ./embed_documents.py --- -### ๐Ÿ”ฎ Qdrant +### Qdrant ```bash podman run -d \ @@ -232,9 +185,11 @@ DB_TYPE=QDRANT ./embed_documents.py --- -## ๐Ÿ™ Acknowledgments +## ๐Ÿ™Œ Acknowledgments + +Built with: - [LangChain](https://github.com/langchain-ai/langchain) - [Unstructured](https://github.com/Unstructured-IO/unstructured) - [Sentence Transformers](https://www.sbert.net/) -- [OpenShift UBI Base Images](https://catalog.redhat.com/software/containers/search) +- [OpenShift UBI Base](https://catalog.redhat.com/software/containers/search) diff --git a/config.py b/config.py index bbb7ae8..3974d08 100644 --- a/config.py +++ b/config.py @@ -18,13 +18,24 @@ @dataclass class Config: """ - Application configuration loaded from environment variables. - - This centralizes all configuration values needed for the embedding job, - including database provider setup, chunking behavior, document sources, - and logging configuration. - - Use `Config.load()` to load and validate values from the current environment. + Global configuration object for embedding and vector DB ingestion jobs. + + This class loads configuration from environment variables and initializes + all the required components (e.g., DB providers, chunking strategy, input sources). + + Attributes: + db_provider (DBProvider): Initialized provider for a vector database. + chunk_size (int): Character length for each document chunk. + chunk_overlap (int): Number of overlapping characters between adjacent chunks. + web_sources (List[str]): List of web URLs to scrape and embed. + repo_sources (List[Dict]): Repositories and glob patterns for file discovery. + temp_dir (str): Path to a temporary working directory. + log_level (int): Log verbosity level. + + Example: + >>> config = Config.load() + >>> print(config.chunk_size) + >>> config.db_provider.add_documents(docs) """ db_provider: DBProvider @@ -37,6 +48,18 @@ class Config: @staticmethod def _get_required_env_var(key: str) -> str: + """ + Retrieve a required environment variable or raise an error. + + Args: + key (str): The environment variable name. + + Returns: + str: The value of the environment variable. + + Raises: + ValueError: If the variable is not defined. + """ value = os.getenv(key) if not value: raise ValueError(f"{key} environment variable is required.") @@ -44,6 +67,18 @@ def _get_required_env_var(key: str) -> str: @staticmethod def _parse_log_level(log_level_name: str) -> int: + """ + Convert a string log level into a `logging` module constant. + + Args: + log_level_name (str): One of DEBUG, INFO, WARNING, ERROR, CRITICAL. + + Returns: + int: Corresponding `logging` level. + + Raises: + ValueError: If an invalid level is provided. + """ log_levels = { "DEBUG": logging.DEBUG, "INFO": logging.INFO, @@ -60,7 +95,16 @@ def _parse_log_level(log_level_name: str) -> int: @staticmethod def _init_db_provider(db_type: str) -> DBProvider: """ - Initialize the correct DBProvider subclass based on DB_TYPE. + Factory method to initialize the correct DB provider from environment variables. + + Args: + db_type (str): Type of DB specified via `DB_TYPE` (e.g., REDIS, PGVECTOR, QDRANT, etc.) + + Returns: + DBProvider: Initialized instance of a provider subclass. + + Raises: + ValueError: If the DB type is unsupported or required vars are missing. """ get = Config._get_required_env_var db_type = db_type.upper() @@ -109,48 +153,47 @@ def _init_db_provider(db_type: str) -> DBProvider: @staticmethod def load() -> "Config": """ - Load configuration from environment variables. + Load application settings from `.env` variables into a typed config object. - All values are expected to be present in the environment and are validated. - This method is the single point of truth for all configurable values used - throughout the embedding pipeline. + This includes logging level setup, DB provider initialization, and input + source validation. Returns: - Config: A fully populated Config object with validated values. + Config: A fully-initialized configuration object. Raises: - ValueError: If any required variable is missing or invalid. + ValueError: If required environment variables are missing or malformed. """ load_dotenv() get = Config._get_required_env_var - # Initialize logger + # Logging setup log_level = get("LOG_LEVEL").upper() logging.basicConfig(level=Config._parse_log_level(log_level)) logger = logging.getLogger(__name__) logger.debug("Logging initialized at level: %s", log_level) - # Initialize db + # Database backend db_type = get("DB_TYPE") db_provider = Config._init_db_provider(db_type) - # Web URLs + # Web source URLs try: web_sources = json.loads(get("WEB_SOURCES")) except json.JSONDecodeError as e: raise ValueError(f"WEB_SOURCES must be a valid JSON list: {e}") - # Repo sources + # Git repositories and file matchers try: repo_sources = json.loads(get("REPO_SOURCES")) except json.JSONDecodeError as e: raise ValueError(f"Invalid REPO_SOURCES JSON: {e}") from e - # Embedding settings + # Embedding chunking strategy chunk_size = int(get("CHUNK_SIZE")) chunk_overlap = int(get("CHUNK_OVERLAP")) - # Misc + # Temporary file location temp_dir = get("TEMP_DIR") return Config( diff --git a/embed_documents.py b/embed_documents.py index 6e342d9..b365d73 100755 --- a/embed_documents.py +++ b/embed_documents.py @@ -1,5 +1,35 @@ #!/usr/bin/env python +""" +embed_documents.py + +Main entry point for embedding documents into a vector database. + +This script performs the following operations: +1. Loads configuration and initializes DB provider and loaders. +2. Fetches and embeds Git-sourced documents (Markdown, PDFs, etc.). +3. Fetches and embeds web documents (HTML and PDFs). +4. Chunks all documents and indexes them into the configured vector store. + +This tool is designed for use in pipelines or manual indexing workflows. + +Usage: + $ python embed_documents.py + +Environment: + Requires a valid .env file or environment variables defined for: + - DB_TYPE, EMBEDDING_MODEL, TEMP_DIR + - CHUNK_SIZE, CHUNK_OVERLAP, LOG_LEVEL + - WEB_SOURCES, REPO_SOURCES + - Plus additional DB-specific variables based on DB_TYPE + +Example: + $ DB_TYPE=QDRANT EMBEDDING_MODEL=BAAI/bge-large-en-v1.5 \ + CHUNK_SIZE=20480 CHUNK_OVERLAP=2048 \ + TEMP_DIR=/tmp EMBEDDING_MODEL=... \ + python embed_documents.py +""" + import logging import sys from pathlib import Path @@ -31,7 +61,20 @@ def _fail_and_exit(message: str, exc: Exception) -> None: def main() -> None: - # Run Git-based document embedding + """ + Main embedding workflow for Git, HTML, and PDF sources. + + Steps: + 1. Load and chunk files from configured Git repos, if any. + 2. Load and chunk HTML documents from web sources. + 3. Download, load, and chunk remote PDF files. + 4. Store all chunks into the configured vector DB provider. + + All errors are logged with traceback and will stop execution via `_fail_and_exit`. + """ + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + # Git-based document ingestion + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ if config.repo_sources: logger.info("Starting Git-based document embedding...") try: @@ -48,11 +91,13 @@ def main() -> None: except Exception as e: _fail_and_exit("Failed during Git document processing", e) - # Split web sources into HTML and PDF URLs + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + # Web-based document ingestion + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ pdf_urls = [url for url in config.web_sources if url.lower().endswith(".pdf")] html_urls = [url for url in config.web_sources if not url.lower().endswith(".pdf")] - # Run HTML-based web embedding + # HTML documents if html_urls: logger.info("Starting HTML-based web document embedding...") try: @@ -67,7 +112,7 @@ def main() -> None: except Exception as e: _fail_and_exit("Failed during HTML web document processing", e) - # Run PDF-based web embedding + # PDF documents if pdf_urls: logger.info("Downloading PDF documents from web URLs...") pdf_dir = Path(config.temp_dir) / "web_pdfs" diff --git a/loaders/git.py b/loaders/git.py index f1c4181..75c7713 100644 --- a/loaders/git.py +++ b/loaders/git.py @@ -15,26 +15,38 @@ class GitLoader: """ - Loads and processes documents from Git repositories based on configured glob patterns. + Loads and chunks documents from Git repositories based on configured glob patterns. - For each configured repository, this loader: - - Clones or pulls the latest repo into a temporary folder - - Applies the configured glob patterns to find matching files - - Loads PDF files using PDFLoader - - Loads supported text files using TextLoader - - Returns all chunked LangChain Document objects (does NOT push to DB) + For each Git repository specified in `config.repo_sources`, this loader: + - Clones or pulls the latest state into a local temp directory + - Applies specified glob patterns to locate files of interest + - Loads PDF files using `PDFLoader` + - Loads general-purpose text files using `TextLoader` + - Adds chunk metadata including `chunk_id` and `chunk_total` + - Returns a list of LangChain `Document` objects (does not push to vector DB) Attributes: - config (Config): Application config including temp paths, glob patterns, and chunking rules. + config (Config): The application configuration, including: + - temp_dir: base temp folder for local repo checkouts + - repo_sources: list of dicts with {repo, globs} + - chunk_size / chunk_overlap: passed through to loaders Example: - >>> config = Config.load() - >>> loader = GitLoader(config) - >>> chunks = loader.load() - >>> config.db_provider.add_documents(chunks) + >>> from config import Config + >>> loader = GitLoader(Config.load()) + >>> docs = loader.load() + >>> print(docs[0].metadata) + {'source': 'docs/content/learn/getting-started.adoc', 'chunk_id': 0, 'chunk_total': 3} """ def __init__(self, config: Config): + """ + Initialize the GitLoader with a configuration. + + Args: + config (Config): Configuration object with glob patterns, temp directory, + and chunking parameters used by sub-loaders. + """ self.config = config self.base_path = Path(config.temp_dir) / "repo_sources" self.pdf_loader = PDFLoader(config) @@ -42,15 +54,16 @@ def __init__(self, config: Config): def load(self) -> List[Document]: """ - Loads and chunks documents from all configured Git repos. + Load and chunk documents from all configured Git repositories. - This includes: - - Cloning or updating each Git repo - - Matching glob patterns to find relevant files - - Loading and chunking documents using appropriate loaders + Process: + 1. Clone or update each repo into a local temp folder + 2. Match files based on repo-specific glob patterns + 3. Route files to PDF or text loader based on extension + 4. Chunk and annotate documents with source + chunk metadata Returns: - List[Document]: Chunked LangChain documents from all matched files. + List[Document]: A list of chunked LangChain documents ready for embedding or indexing. """ all_chunks: List[Document] = [] @@ -81,6 +94,16 @@ def load(self) -> List[Document]: return all_chunks def _ensure_repo_up_to_date(self, url: str, dest: Path) -> None: + """ + Ensure a local copy of the Git repo is present and up to date. + + If the repo exists, attempts a `git pull`. If it fails or the repo + is missing, performs a fresh `git clone`. + + Args: + url (str): Git repository URL. + dest (Path): Local path where the repo should reside. + """ if dest.exists(): logger.info("Repo already cloned at %s, attempting pull...", dest) try: @@ -98,6 +121,16 @@ def _ensure_repo_up_to_date(self, url: str, dest: Path) -> None: self._clone_repo(url, dest) def _clone_repo(self, url: str, dest: Path) -> None: + """ + Clone the given Git repository to the destination directory. + + Args: + url (str): Git repository URL. + dest (Path): Target path for clone. + + Raises: + RuntimeError: If the clone operation fails. + """ logger.info("Cloning repository %s to %s", url, dest) try: subprocess.run( @@ -111,6 +144,16 @@ def _clone_repo(self, url: str, dest: Path) -> None: raise RuntimeError(f"Failed to clone repo: {url}") from e def _collect_files(self, base: Path, patterns: List[str]) -> List[Path]: + """ + Apply glob patterns to collect file paths in a repo directory. + + Args: + base (Path): Root path of the repo checkout. + patterns (List[str]): List of glob patterns (e.g. "docs/**/*.md"). + + Returns: + List[Path]: All matched paths under the repo root. + """ matched: List[Path] = [] for pattern in patterns: diff --git a/loaders/pdf.py b/loaders/pdf.py index 624b93f..cf96424 100644 --- a/loaders/pdf.py +++ b/loaders/pdf.py @@ -13,21 +13,30 @@ class PDFLoader: """ - Loads and splits a list of PDF documents using PyPDFLoader. + Loads and splits a list of PDF files into chunked LangChain `Document` objects. - Each PDF is processed individually, then split into smaller chunks - using RecursiveCharacterTextSplitter to optimize for RAG and vector DB ingestion. + This loader: + - Uses `PyPDFLoader` to convert each page of a PDF into text + - Applies `RecursiveCharacterTextSplitter` to split pages into smaller overlapping chunks + - Annotates each chunk with metadata including `source` and `chunk_id` Attributes: - config (Config): Global configuration for chunking and db connection. + config (Config): Configuration object that specifies chunk size and overlap. Example: >>> loader = PDFLoader(config) - >>> chunks = loader.load([Path("paper.pdf"), Path("spec.pdf")]) - >>> print(chunks[0].page_content) + >>> docs = loader.load([Path("whitepaper.pdf"), Path("spec.pdf")]) + >>> print(docs[0].metadata) + {'source': 'whitepaper.pdf', 'chunk_id': 0} """ def __init__(self, config: Config): + """ + Initialize the PDFLoader with a given configuration. + + Args: + config (Config): Contains chunking parameters (chunk_size, chunk_overlap). + """ self.config = config self.splitter = RecursiveCharacterTextSplitter( chunk_size=config.chunk_size, @@ -36,24 +45,34 @@ def __init__(self, config: Config): def load(self, paths: List[Path]) -> List[Document]: """ - Loads and splits a list of PDF files. + Loads and chunks the content of each PDF file. + + For each PDF: + - Extracts per-page content using `PyPDFLoader` + - Splits text into chunks optimized for retrieval + - Annotates each chunk with its source path and chunk index Args: - paths (List[Path]): List of PDF file paths. + paths (List[Path]): List of PDF file paths to process. Returns: - List[Document]: A list of chunked LangChain Document objects. + List[Document]: List of chunked documents with metadata attached. """ all_chunks: List[Document] = [] for path in paths: try: logger.info("Loading PDF: %s", path) - loader = PyPDFLoader(str(path)) - docs = loader.load() - chunks = self.splitter.split_documents(docs) + pages = PyPDFLoader(str(path)).load() + chunks = self.splitter.split_documents(pages) + + for cid, ch in enumerate(chunks): + ch.metadata.setdefault("source", str(path)) + ch.metadata["chunk_id"] = cid + all_chunks.extend(chunks) + except Exception as e: - logger.warning("Failed to load PDF file %s: %s", path, e) + logger.warning("Failed to load PDF %s: %s", path, e) return all_chunks diff --git a/loaders/text.py b/loaders/text.py index 5e5922d..86c8f2a 100644 --- a/loaders/text.py +++ b/loaders/text.py @@ -13,25 +13,37 @@ class TextLoader: """ - Loads and semantically splits a list of general-purpose text documents - (e.g., .txt, .md, .rst, .adoc) using Unstructured's local partitioning engine. + Loads and semantically splits general-purpose text documents for RAG use. - This loader does not require an API key and works fully offline. + This loader uses Unstructured's local partitioning engine to break documents + into semantically meaningful elements (e.g., titles, narrative text, lists), + and then groups those elements into chunked LangChain Document objects, + each annotated with `chunk_id` and `chunk_total` metadata for use in + neighborhood-aware retrieval. - Unstructured's `partition()` function breaks files into structured elements - like titles, narrative text, lists, etc., which improves RAG chunk quality - over naive fixed-size splitting. + This loader works fully offline and does not require an API key. Attributes: - config (Config): The job configuration, including chunk size and overlap. + config (Config): Configuration object specifying chunking parameters. Example: - >>> loader = TextLoader(config) - >>> docs = loader.load([Path("README.md"), Path("guide.rst")]) - >>> print(docs[0].page_content) + >>> from pathlib import Path + >>> from config import Config + >>> loader = TextLoader(Config(chunk_size=1024, chunk_overlap=100)) + >>> docs = loader.load([Path("README.md"), Path("guide.adoc")]) + >>> print(docs[0].metadata) + {'source': 'README.md', 'chunk_id': 0, 'chunk_total': 3} """ def __init__(self, config: Config): + """ + Initializes the TextLoader with chunking parameters. + + Args: + config (Config): Application-level configuration object, must include: + - config.chunk_size (int): Max number of characters per chunk. + - config.chunk_overlap (int): Optional overlap between chunks. + """ self.config = config self.splitter = RecursiveCharacterTextSplitter( chunk_size=config.chunk_size, @@ -39,7 +51,31 @@ def __init__(self, config: Config): ) def load(self, paths: List[Path]) -> List[Document]: - """Partition โ†’ group โ†’ (optional) secondary split โ†’ add chunk indices.""" + """ + Loads and splits a list of text files into semantic chunks. + + This function uses Unstructured's `partition()` function to extract + structured document elements, then assembles these into grouped chunks + (respecting max chunk size) while tagging each chunk with: + + - 'source': the original file path + - 'chunk_id': the position of this chunk in the document + - 'chunk_total': total number of chunks for that file + + Args: + paths (List[Path]): List of file paths to text-based documents + (.txt, .md, .adoc, .rst, etc.). + + Returns: + List[Document]: A list of LangChain `Document` objects, each + containing chunked text and structured metadata. + + Notes: + - If a grouped chunk exceeds 2x chunk_size, it is re-split using + a recursive character splitter. + - Each chunk begins with a lightweight heading that includes the + filename to help orient the LLM when formatting prompts. + """ grouped: list[Document] = [] for path in paths: @@ -71,7 +107,7 @@ def _flush(): if not txt: continue if buf_len == 0: - buf.append(f"## {fname}\n") # one heading per chunk + buf.append(f"## {fname}\n") # inject heading if buf_len + len(txt) > self.config.chunk_size: _flush() buf.append(txt) @@ -81,7 +117,7 @@ def _flush(): except Exception as e: logger.warning("Failed to load %s: %s", path, e) - # โ€” optional secondary split for ultraโ€‘long groups โ€” + # Handle oversized chunks via recursive splitting final_docs = [] for doc in grouped: if len(doc.page_content) > self.config.chunk_size * 2: @@ -89,7 +125,7 @@ def _flush(): else: final_docs.append(doc) - # annotate chunk_total (needed only once per file) + # Add chunk_total metadata for all docs counts: dict[str, int] = {} for d in final_docs: counts[d.metadata["source"]] = counts.get(d.metadata["source"], 0) + 1 diff --git a/loaders/web.py b/loaders/web.py index 9d7c86b..f9d21fb 100644 --- a/loaders/web.py +++ b/loaders/web.py @@ -1,5 +1,5 @@ import logging -from typing import List +from typing import Dict, List from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import WebBaseLoader @@ -12,21 +12,31 @@ class WebLoader: """ - Loads and semantically splits documents from a list of web URLs - using LangChain's WebBaseLoader and a configurable chunking strategy. + Loads and chunks content from a list of web URLs using LangChain's `WebBaseLoader`. - This loader does not embed documents directly โ€” it returns them to be - embedded by the caller. + This loader: + - Fetches HTML or text content from each URL + - Applies recursive text chunking based on the configured chunk size and overlap + - Annotates each chunk with metadata including `source` (URL) and `chunk_id` + for downstream neighbor expansion in retrieval tasks Attributes: - config (Config): The application config containing chunk size and overlap. + config (Config): Configuration object containing chunk size and overlap. Example: >>> loader = WebLoader(config) - >>> chunks = loader.load(["https://example.com/page1", "https://example.com/page2"]) + >>> chunks = loader.load(["https://example.com/intro", "https://example.com/docs"]) + >>> print(chunks[0].metadata) + {'source': 'https://example.com/intro', 'chunk_id': 0} """ def __init__(self, config: Config): + """ + Initialize the WebLoader with a given configuration. + + Args: + config (Config): Configuration object with chunking parameters. + """ self.config = config self.splitter = RecursiveCharacterTextSplitter( chunk_size=config.chunk_size, @@ -35,34 +45,44 @@ def __init__(self, config: Config): def load(self, urls: List[str]) -> List[Document]: """ - Load and chunk documents from the given list of URLs. + Loads and splits documents from the given list of URLs. + + Steps: + - Downloads the web pages + - Splits content into semantic or character-based chunks + - Adds `source` and `chunk_id` to each chunk for traceability Args: - urls (List[str]): Web URLs to load. + urls (List[str]): List of web URLs to fetch and process. Returns: - List[Document]: A list of chunked LangChain documents. + List[Document]: A list of chunked `Document` objects with metadata. """ if not urls: - logger.warning("No URLs provided to WebLoader.") + logger.warning("WebLoader called with empty URL list.") return [] - logger.info("Loading web documents from %d URL(s)...", len(urls)) - for url in urls: - logger.debug(" - %s", url) - + logger.info("Loading %d web document(s)โ€ฆ", len(urls)) try: - loader = WebBaseLoader(urls) - docs = loader.load() + docs = WebBaseLoader(urls).load() except Exception: - logger.exception("Failed to load URLs via WebBaseLoader") + logger.exception("Failed to fetch one or more URLs") raise + chunks = self.splitter.split_documents(docs) + + # Assign unique chunk_id per source URL + per_source_counter: Dict[str, int] = {} + for ch in chunks: + src = ch.metadata.get("source") or ch.metadata.get("url") or "unknown" + ch.metadata["source"] = src + ch.metadata["chunk_id"] = per_source_counter.setdefault(src, 0) + per_source_counter[src] += 1 + logger.info( - "Splitting %d document(s) with chunk size %s and overlap %s", - len(docs), - self.config.chunk_size, - self.config.chunk_overlap, + "Produced %d web chunks (avg %.0f chars)", + len(chunks), + sum(len(c.page_content) for c in chunks) / max(1, len(chunks)), ) - return self.splitter.split_documents(docs) + return chunks diff --git a/vector_db/db_provider.py b/vector_db/db_provider.py index 123abc0..c118d08 100644 --- a/vector_db/db_provider.py +++ b/vector_db/db_provider.py @@ -8,20 +8,47 @@ class DBProvider(ABC): """ - Abstract base class for vector DB providers. - Subclasses must implement `add_documents`. + Abstract base class for vector database providers. + + This class standardizes how vector databases are initialized and how documents + are added to them. All concrete implementations (e.g., Qdrant, FAISS) must + subclass `DBProvider` and implement the `add_documents()` method. + + Attributes: + embeddings (Embeddings): An instance of HuggingFace embeddings based on the + specified model. Args: - embedding_model (str): Embedding model to use + embedding_model (str): HuggingFace-compatible model name to be used for computing + dense vector embeddings for documents. + + Example: + >>> class MyProvider(DBProvider): + ... def add_documents(self, docs): + ... print(f"Would add {len(docs)} docs with model {self.embeddings.model_name}") + + >>> provider = MyProvider("BAAI/bge-small-en") + >>> provider.add_documents([Document(page_content="Hello")]) """ def __init__(self, embedding_model: str) -> None: + """ + Initialize a DB provider with a specific embedding model. + + Args: + embedding_model (str): The HuggingFace model name to be used for generating embeddings. + """ self.embeddings: Embeddings = HuggingFaceEmbeddings(model_name=embedding_model) @abstractmethod def add_documents(self, docs: List[Document]) -> None: """ - Add a list of documents (already embedded or to be embedded) to the vector store. - Must be implemented by subclasses. + Add documents to the vector database. + + This method must be implemented by subclasses to define how documents + (with or without precomputed embeddings) are stored in the backend vector DB. + + Args: + docs (List[Document]): A list of LangChain `Document` objects to be embedded and added. """ pass diff --git a/vector_db/dryrun_provider.py b/vector_db/dryrun_provider.py index a220af3..c96c456 100644 --- a/vector_db/dryrun_provider.py +++ b/vector_db/dryrun_provider.py @@ -7,30 +7,48 @@ class DryRunProvider(DBProvider): """ - A mock DBProvider used in dry run mode. + A mock vector DB provider for debugging document loading and chunking. - Instead of storing documents in a vector database, this provider prints the - chunked documents to stdout. It is useful for debugging document loading, - chunking, and metadata before committing to a real embedding operation. + `DryRunProvider` does not persist any documents or perform embedding operations. + Instead, it prints a preview of the documents and their metadata to stdout, + allowing users to validate chunking, structure, and metadata before pushing + to a production vector store. + + Useful for development, testing, or understanding how your documents are + being processed. + + Attributes: + embeddings (Embeddings): HuggingFace embedding model for compatibility. Args: - embedding_model (str): Embedding model to use + embedding_model (str): The model name to initialize HuggingFaceEmbeddings. + Used only for compatibility โ€” no embeddings are generated. Example: - >>> from vector_db.dry_run_provider import DryRunProvider - >>> provider = DryRunProvider("sentence-transformers/all-mpnet-base-v2") - >>> provider.add_documents(docs) # docs is a List[Document] + >>> from langchain_core.documents import Document + >>> provider = DryRunProvider("BAAI/bge-small-en") + >>> docs = [Document(page_content="Hello world", metadata={"source": "test.txt"})] + >>> provider.add_documents(docs) """ def __init__(self, embedding_model: str): + """ + Initialize the dry run provider with a placeholder embedding model. + + Args: + embedding_model (str): The name of the embedding model (used for interface consistency). + """ super().__init__(embedding_model) def add_documents(self, docs: List[Document]) -> None: """ - Print chunked documents and metadata to stdout for debugging. + Print chunked documents and metadata to stdout for inspection. + + This method displays the first 10 document chunks, including the start + of their page content and associated metadata. Args: - docs (List[Document]): The documents to preview in dry run mode. + docs (List[Document]): A list of LangChain documents to preview. """ print("\n=== Dry Run Output ===") for i, doc in enumerate(docs[:10]): diff --git a/vector_db/elastic_provider.py b/vector_db/elastic_provider.py index bb363e0..a356f8b 100644 --- a/vector_db/elastic_provider.py +++ b/vector_db/elastic_provider.py @@ -11,29 +11,53 @@ class ElasticProvider(DBProvider): """ - Elasticsearch-based vector DB provider using LangChain's ElasticsearchStore. + Vector database provider backed by Elasticsearch using LangChain's ElasticsearchStore. + + This provider allows storing and querying vectorized documents in an Elasticsearch + cluster. Documents are embedded using a HuggingFace model and stored with associated + metadata in the specified index. + + Attributes: + db (ElasticsearchStore): LangChain-compatible wrapper around Elasticsearch vector storage. + embeddings (Embeddings): HuggingFace embedding model for generating document vectors. Args: - embedding_model (str): Embedding model to use - url (str): Full URL to the Elasticsearch cluster (e.g. http://localhost:9200) - password (str): Authentication password for the cluster - index (str): Index name to use for vector storage - user (str): Username for Elasticsearch (default: "elastic") + embedding_model (str): HuggingFace model name for computing embeddings. + url (str): Full URL to the Elasticsearch cluster (e.g. "http://localhost:9200"). + password (str): Password for the Elasticsearch user. + index (str): The index name where documents will be stored. + user (str): Elasticsearch username (default is typically "elastic"). Example: + >>> from vector_db.elastic_provider import ElasticProvider >>> provider = ElasticProvider( - ... embedding_model="sentence-transformers/all-mpnet-base-v2", + ... embedding_model="BAAI/bge-small-en", ... url="http://localhost:9200", ... password="changeme", - ... index="docs", + ... index="rag-docs", ... user="elastic" ... ) - >>> provider.add_documents(chunks) + >>> provider.add_documents(docs) """ def __init__( - self, embedding_model: str, url: str, password: str, index: str, user: str + self, + embedding_model: str, + url: str, + password: str, + index: str, + user: str, ): + """ + Initialize an Elasticsearch-based vector DB provider. + + Args: + embedding_model (str): The model name for computing embeddings. + url (str): Full URL of the Elasticsearch service. + password (str): Elasticsearch user's password. + index (str): Name of the Elasticsearch index to use. + user (str): Elasticsearch username (e.g., "elastic"). + """ super().__init__(embedding_model) self.db = ElasticsearchStore( @@ -48,9 +72,12 @@ def __init__( def add_documents(self, docs: List[Document]) -> None: """ - Add documents to the Elasticsearch index. + Add a batch of LangChain documents to the Elasticsearch index. + + Each document will be embedded using the configured model and stored + in the specified index with any associated metadata. Args: - docs (List[Document]): Chunked LangChain documents to index. + docs (List[Document]): List of documents to index. """ self.db.add_documents(docs) diff --git a/vector_db/pgvector_provider.py b/vector_db/pgvector_provider.py index 035156e..cc3bbe3 100644 --- a/vector_db/pgvector_provider.py +++ b/vector_db/pgvector_provider.py @@ -12,26 +12,40 @@ class PGVectorProvider(DBProvider): """ - PGVector-based vector DB provider. + Vector database provider backed by PostgreSQL with pgvector extension. - Uses the `langchain_postgres.PGVector` integration to store - document embeddings in a PostgreSQL-compatible backend with pgvector enabled. + This provider uses LangChain's `PGVector` integration to store and query + embedded documents in a PostgreSQL-compatible database. It requires a working + `pgvector` extension in the target database. + + Attributes: + db (PGVector): LangChain-compatible PGVector client for vector storage. + embeddings (Embeddings): HuggingFace model for generating document vectors. Args: - embedding_model (str): Embedding model to use - url (str): PostgreSQL connection string (e.g. postgresql://user:pass@host:5432/db) - collection_name (str): Name of the pgvector table or collection + embedding_model (str): The model name to use for computing embeddings. + url (str): PostgreSQL connection string (e.g. "postgresql://user:pass@host:5432/db"). + collection_name (str): Name of the table/collection used for storing vectors. Example: + >>> from vector_db.pgvector_provider import PGVectorProvider >>> provider = PGVectorProvider( - ... embedding_model="sentence-transformers/all-mpnet-base-v2", - ... url="postgresql://user:pass@localhost:5432/mydb", - ... collection_name="documents" + ... embedding_model="BAAI/bge-base-en-v1.5", + ... url="postgresql://user:pass@localhost:5432/vector_db", + ... collection_name="rag_chunks" ... ) - >>> provider.add_documents(chunks) + >>> provider.add_documents(docs) """ def __init__(self, embedding_model: str, url: str, collection_name: str): + """ + Initialize a PGVectorProvider for use with PostgreSQL. + + Args: + embedding_model (str): HuggingFace model used for embedding chunks. + url (str): Connection string to PostgreSQL with pgvector enabled. + collection_name (str): Name of the vector table in the database. + """ super().__init__(embedding_model) self.db = PGVector( @@ -52,13 +66,14 @@ def __init__(self, embedding_model: str, url: str, collection_name: str): def add_documents(self, docs: List[Document]) -> None: """ - Add a list of documents to the pgvector-backed vector store. + Store a list of documents in the PGVector collection. - Args: - docs (List[Document]): LangChain document chunks to embed and store. + This will embed the documents using the configured model and persist them + to the PostgreSQL backend. Any null bytes (\\x00) are removed from text to + prevent PostgreSQL errors. - Notes: - - Null bytes (`\\x00`) are removed from content to avoid PostgreSQL errors. + Args: + docs (List[Document]): Chunked LangChain documents to store. """ for doc in docs: doc.page_content = doc.page_content.replace("\x00", "") diff --git a/vector_db/qdrant_provider.py b/vector_db/qdrant_provider.py index d8d3c47..ffb7c3e 100644 --- a/vector_db/qdrant_provider.py +++ b/vector_db/qdrant_provider.py @@ -13,24 +13,32 @@ class QdrantProvider(DBProvider): """ - Qdrant-based vector DB provider using LangChain's QdrantVectorStore. + Vector database provider backed by Qdrant, using LangChain's QdrantVectorStore. - Args: - embedding_model (str): Embedding model to use - url (str): Base URL of the Qdrant service (e.g., http://localhost:6333) - collection (str): Name of the vector collection to use or create - api_key (Optional[str]): API key if authentication is required (optional) + This provider connects to a running Qdrant instance and stores embedded document + vectors in a named collection. If the collection does not exist, it will be created + automatically with COSINE distance. + + Attributes: + client (QdrantClient): Low-level Qdrant client for managing collections. + db (QdrantVectorStore): LangChain-compatible wrapper for vector operations. + embeddings (Embeddings): HuggingFace model for embedding chunks. - This provider will create the collection if it does not already exist. + Args: + embedding_model (str): HuggingFace model used for embedding document text. + url (str): Base URL for the Qdrant service (e.g., "http://localhost:6333"). + collection (str): Name of the Qdrant collection to use. + api_key (Optional[str]): Optional API key if authentication is required. Example: + >>> from vector_db.qdrant_provider import QdrantProvider >>> provider = QdrantProvider( - ... embedding_model="sentence-transformers/all-mpnet-base-v2", + ... embedding_model="BAAI/bge-base-en-v1.5", ... url="http://localhost:6333", - ... collection="embedded_docs", + ... collection="docs", ... api_key=None ... ) - >>> provider.add_documents(docs) + >>> provider.add_documents(chunks) """ def __init__( @@ -40,6 +48,15 @@ def __init__( collection: str, api_key: Optional[str] = None, ): + """ + Initialize the Qdrant vector DB provider. + + Args: + embedding_model (str): Name of the embedding model to use. + url (str): URL of the Qdrant instance (e.g., http://localhost:6333). + collection (str): Name of the collection to use or create. + api_key (Optional[str]): Optional Qdrant API key for authenticated instances. + """ super().__init__(embedding_model) self.collection = collection self.url = url @@ -65,9 +82,18 @@ def __init__( ) def _collection_exists(self) -> bool: + """ + Check if the Qdrant collection already exists. + + Returns: + bool: True if the collection exists, False otherwise. + """ return self.client.collection_exists(self.collection) def _create_collection(self) -> None: + """ + Create a new collection in Qdrant using the current embedding model's vector size. + """ vector_size = len(self.embeddings.embed_query("test")) self.client.recreate_collection( collection_name=self.collection, @@ -76,9 +102,9 @@ def _create_collection(self) -> None: def add_documents(self, docs: List[Document]) -> None: """ - Add documents to the Qdrant vector store. + Add a list of embedded documents to the Qdrant collection. Args: - docs (List[Document]): Chunked LangChain documents to index. + docs (List[Document]): Chunked LangChain documents to store in Qdrant. """ self.db.add_documents(documents=docs) diff --git a/vector_db/redis_provider.py b/vector_db/redis_provider.py index e1acc2a..6849ad9 100644 --- a/vector_db/redis_provider.py +++ b/vector_db/redis_provider.py @@ -12,28 +12,43 @@ class RedisProvider(DBProvider): """ - Redis-based vector DB provider using RediSearch and LangChain's Redis integration. + Redis-backed vector DB provider using RediSearch and LangChain's Redis integration. - Args: - embedding_model (str): Embedding model to use - url (str): Redis connection string (e.g. redis://localhost:6379) - index (str): RediSearch index name (must be provided via .env) - schema (str): Path to RediSearch schema YAML file (must be provided via .env) + This provider connects to a Redis instance, checks if the specified index exists, + and either loads from it or creates a new index on first insert. Vectors are stored + using the RediSearch module with configurable schema. + + Attributes: + redis_client (redis.Redis): Raw Redis client for low-level access. + db (Optional[RedisVectorStore]): LangChain vector store, lazily created on first add. - This provider will either load from an existing Redis index or defer creation - until documents are available. + Args: + embedding_model (str): Name of the embedding model to use for text chunks. + url (str): Redis connection string (e.g., "redis://localhost:6379"). + index (str): RediSearch index name to use for vector storage. + schema (str): Path to schema file where the RediSearch index definition is written. Example: + >>> from vector_db.redis_provider import RedisProvider >>> provider = RedisProvider( - ... embedding_model="sentence-transformers/all-mpnet-base-v2", + ... embedding_model="BAAI/bge-large-en-v1.5", ... url="redis://localhost:6379", - ... index="docs", + ... index="validated_docs", ... schema="redis_schema.yaml" ... ) - >>> provider.add_documents(chunks) + >>> provider.add_documents(docs) """ def __init__(self, embedding_model: str, url: str, index: str, schema: str): + """ + Initialize a Redis-backed vector store provider. + + Args: + embedding_model (str): HuggingFace model for embeddings. + url (str): Redis connection string. + index (str): Name of the RediSearch index to use. + schema (str): Path to write RediSearch schema YAML (used on creation). + """ super().__init__(embedding_model) self.url = url self.index = index @@ -63,6 +78,12 @@ def __init__(self, embedding_model: str, url: str, index: str, schema: str): ) def _index_exists(self) -> bool: + """ + Check whether the Redis index already exists. + + Returns: + bool: True if the index exists, False otherwise. + """ try: self.redis_client.ft(self.index).info() return True @@ -71,10 +92,10 @@ def _index_exists(self) -> bool: def add_documents(self, docs: List[Document]) -> None: """ - Add document chunks to Redis vector store. + Add a list of documents to the Redis vector store. Args: - docs (List[Document]): Chunked LangChain documents to store. + docs (List[Document]): LangChain document chunks to embed and store. """ if self.db is None: logger.info("Creating new Redis index: %s", self.index) diff --git a/vector_db/sqlserver_provider.py b/vector_db/sqlserver_provider.py index 4b0bac2..7c9967d 100644 --- a/vector_db/sqlserver_provider.py +++ b/vector_db/sqlserver_provider.py @@ -12,30 +12,37 @@ class SQLServerProvider(DBProvider): """ - SQL Server-based vector DB provider using LangChain's SQLServer_VectorStore. + SQL Server-based vector DB provider using LangChain's SQLServer_VectorStore integration. + + This provider connects to a Microsoft SQL Server instance and stores document embeddings + in a specified table. If the target database does not exist, it will be created automatically. + + Attributes: + db (SQLServer_VectorStore): Underlying LangChain-compatible vector store. + connection_string (str): Full ODBC connection string to the SQL Server instance. Args: - embedding_model (str): Embedding model to use - host (str): Hostname of the SQL Server - port (str): Port number - user (str): SQL login username - password (str): SQL login password - database (str): Database name to connect to or create - table (str): Name of the table used to store vector embeddings - driver (str): ODBC driver name (e.g., 'ODBC Driver 18 for SQL Server') + embedding_model (str): HuggingFace-compatible embedding model to use. + host (str): SQL Server hostname or IP address. + port (str): Port number (typically 1433). + user (str): SQL Server login username. + password (str): SQL Server login password. + database (str): Target database name. Will be created if not present. + table (str): Table name to store vector embeddings. + driver (str): ODBC driver name (e.g., 'ODBC Driver 18 for SQL Server'). Example: >>> provider = SQLServerProvider( - ... embedding_model="sentence-transformers/all-mpnet-base-v2", + ... embedding_model="BAAI/bge-large-en-v1.5", ... host="localhost", ... port="1433", ... user="sa", ... password="StrongPassword!", - ... database="docs", - ... table="vector_table", + ... database="my_vectors", + ... table="embedded_docs", ... driver="ODBC Driver 18 for SQL Server" ... ) - >>> provider.add_documents(chunks) + >>> provider.add_documents(docs) """ def __init__( @@ -60,7 +67,6 @@ def __init__( self.driver = driver self.connection_string = self._build_connection_string(self.database) - self._ensure_database_exists() logger.info( @@ -74,10 +80,19 @@ def __init__( connection_string=self.connection_string, embedding_function=self.embeddings, table_name=self.table, - embedding_length=768, # Should match the sentence-transformer model + embedding_length=768, # Ensure this matches the model you're using ) def _build_connection_string(self, db_name: str) -> str: + """ + Construct a SQL Server ODBC connection string. + + Args: + db_name (str): Name of the database to connect to. + + Returns: + str: ODBC-compliant connection string. + """ return ( f"Driver={{{self.driver}}};" f"Server={self.host},{self.port};" @@ -89,6 +104,12 @@ def _build_connection_string(self, db_name: str) -> str: ) def _ensure_database_exists(self) -> None: + """ + Connect to the SQL Server master database and create the target database if missing. + + Raises: + RuntimeError: If the database cannot be created or accessed. + """ master_conn_str = self._build_connection_string("master") try: with pyodbc.connect(master_conn_str, autocommit=True) as conn: @@ -105,13 +126,13 @@ def _ensure_database_exists(self) -> None: def add_documents(self, docs: List[Document]) -> None: """ - Add documents to the SQL Server table in batches. + Add documents to the SQL Server table in small batches. Args: - docs (List[Document]): List of LangChain documents to embed and insert. + docs (List[Document]): LangChain document chunks to embed and store. Raises: - Exception: If any batch insert fails. + Exception: If a batch insert operation fails. """ batch_size = 50 for i in range(0, len(docs), batch_size):