From 167f34dad254f37a4cdd0e9b848a965737e568d8 Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 14:39:21 -0400 Subject: [PATCH 01/12] web sources that are pdfs are downloaded and chunked with PDF loader --- embed_documents.py | 66 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 11 deletions(-) diff --git a/embed_documents.py b/embed_documents.py index 8ce1282..cd11e02 100755 --- a/embed_documents.py +++ b/embed_documents.py @@ -1,17 +1,23 @@ #!/usr/bin/env python import logging +from pathlib import Path + +import requests from config import Config from loaders.git import GitLoader +from loaders.pdf import PDFLoader from loaders.web import WebLoader -# Initialize logging +# Load environment config config = Config.load() + +# Configure logging logging.basicConfig(level=config.log_level) logger = logging.getLogger(__name__) -# Run Git document embedding if sources provided +# Git-based embedding if config.repo_sources: logger.info("Starting Git-based document embedding...") try: @@ -28,21 +34,59 @@ except Exception: logger.exception("Failed during Git document processing") -# Run Web document embedding if URLs provided -if config.web_sources: - logger.info("Starting Web-based document embedding...") +# Separate Web URLs by type +pdf_urls = [url for url in config.web_sources if url.lower().endswith(".pdf")] +html_urls = [url for url in config.web_sources if not url.lower().endswith(".pdf")] + +# HTML URL embedding +if html_urls: + logger.info("Starting HTML-based web document embedding...") try: web_loader = WebLoader(config) - web_chunks = web_loader.load(config.web_sources) + web_chunks = web_loader.load(html_urls) if web_chunks: - logger.info( - "Adding %d document chunks from Web to vector DB", len(web_chunks) - ) + logger.info("Adding %d HTML chunks to vector DB", len(web_chunks)) config.db_provider.add_documents(web_chunks) else: - logger.info("No documents returned from provided URLs.") + logger.info("No chunks produced from HTML URLs.") except Exception: - logger.exception("Failed during Web document processing") + logger.exception("Failed during HTML web document processing") + +# PDF URL embedding +if pdf_urls: + logger.info("Processing PDF documents from web URLs...") + + pdf_dir = Path(config.temp_dir) / "web_pdfs" + pdf_dir.mkdir(parents=True, exist_ok=True) + + downloaded_files = [] + for url in pdf_urls: + try: + response = requests.get(url) + response.raise_for_status() + + filename = Path(url.split("/")[-1]) + file_path = pdf_dir / filename + with open(file_path, "wb") as f: + f.write(response.content) + + logger.info("Downloaded: %s", file_path) + downloaded_files.append(file_path) + except Exception as e: + logger.exception("Failed to download %s: %s", url, e) + + if downloaded_files: + try: + pdf_loader = PDFLoader(config) + pdf_chunks = pdf_loader.load(downloaded_files) + + if pdf_chunks: + logger.info("Adding %d PDF chunks to vector DB", len(pdf_chunks)) + config.db_provider.add_documents(pdf_chunks) + else: + logger.info("No chunks produced from downloaded PDFs.") + except Exception: + logger.exception("Failed during PDF web document processing") logger.info("Embedding job complete.") From a3c4f2c992217e4696467115ee4f37830552768d Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 15:26:12 -0400 Subject: [PATCH 02/12] git loader will try to pull latest if already exists and not pass dirs to textloader --- loaders/git.py | 53 +++++++++++++++++++++++++++++++----------------- requirements.txt | 4 ++-- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/loaders/git.py b/loaders/git.py index 1f6b0ec..0372316 100644 --- a/loaders/git.py +++ b/loaders/git.py @@ -1,5 +1,6 @@ import logging import subprocess +import shutil from pathlib import Path from typing import List @@ -14,19 +15,17 @@ class GitLoader: """ - Clones repositories and loads documents based on glob patterns using PDF and text loaders. + Loads and processes documents from Git repositories based on configured glob patterns. - For each repository defined in `config.repo_sources`, this loader: - - Clones the repo to a local folder in TEMP_DIR/source_repo/ - - Resolves the configured globs relative to the repo root - - Loads and chunks `.pdf` files using PDFLoader - - Loads and chunks all other supported text files using TextLoader - - This loader returns all chunked `Document` objects so the caller can decide - how and when to add them to a vector store. + For each configured repository, this loader: + - Clones or pulls the latest repo into a temporary folder + - Applies the configured glob patterns to find matching files + - Loads PDF files using PDFLoader + - Loads supported text files using TextLoader + - Returns all chunked LangChain Document objects (does NOT push to DB) Attributes: - config (Config): The configuration object containing repo info, chunk settings, etc. + config (Config): Application config including temp paths, glob patterns, and chunking rules. Example: >>> config = Config.load() @@ -43,13 +42,15 @@ def __init__(self, config: Config): def load(self) -> List[Document]: """ - Clones all configured repos, applies glob patterns, loads and chunks matched documents. + Loads and chunks documents from all configured Git repos. - Returns: - List[Document]: All chunked documents loaded from the matched files in all repositories. + This includes: + - Cloning or updating each Git repo + - Matching glob patterns to find relevant files + - Loading and chunking documents using appropriate loaders - Raises: - RuntimeError: If cloning or file loading fails. + Returns: + List[Document]: Chunked LangChain documents from all matched files. """ all_chunks: List[Document] = [] @@ -59,9 +60,10 @@ def load(self) -> List[Document]: repo_name = repo_url.rstrip("/").split("/")[-1].replace(".git", "") repo_path = self.base_path / repo_name - self._clone_repo(repo_url, repo_path) + self._ensure_repo_up_to_date(repo_url, repo_path) matched_files = self._collect_files(repo_path, globs) + matched_files = [f for f in matched_files if f.is_file()] pdf_files = [f for f in matched_files if f.suffix.lower() == ".pdf"] text_files = [f for f in matched_files if f.suffix.lower() != ".pdf"] @@ -78,11 +80,24 @@ def load(self) -> List[Document]: return all_chunks - def _clone_repo(self, url: str, dest: Path) -> None: + def _ensure_repo_up_to_date(self, url: str, dest: Path) -> None: if dest.exists(): - logger.info("Repo already cloned at %s, skipping", dest) - return + logger.info("Repo already cloned at %s, attempting pull...", dest) + try: + subprocess.run( + ["git", "-C", str(dest), "pull"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return + except subprocess.CalledProcessError: + logger.warning("git pull failed for %s, removing and recloning...", url) + shutil.rmtree(dest) + + self._clone_repo(url, dest) + def _clone_repo(self, url: str, dest: Path) -> None: logger.info("Cloning repository %s to %s", url, dest) try: subprocess.run( diff --git a/requirements.txt b/requirements.txt index 82bef3a..bbe4479 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,11 +7,11 @@ langchain-postgres==0.0.14 langchain-qdrant==0.2.0 langchain-sqlserver==0.1.1 langchain==0.3.23 -psycopg2-binary==2.9.10 +psycopg[binary]==3.2.6 pyodbc==5.2.0 pypdf==5.4.0 python-dotenv==1.1.0 qdrant-client==1.13.3 redis==5.2.1 sentence-transformers==4.1.0 -unstructured==0.17.2 +unstructured[md]==0.17.2 From 46463d4395ea357096618f382097db1cd623e31a Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 16:29:20 -0400 Subject: [PATCH 03/12] create more robust ci pipeline for testing db providers before pushing image to quay --- .github/workflows/ci-pipeline.yaml | 105 ++++++++++++++++++++++++++++ .github/workflows/lint.yaml | 28 -------- .github/workflows/push-to-quay.yaml | 51 -------------- 3 files changed, 105 insertions(+), 79 deletions(-) create mode 100644 .github/workflows/ci-pipeline.yaml delete mode 100644 .github/workflows/lint.yaml delete mode 100644 .github/workflows/push-to-quay.yaml diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml new file mode 100644 index 0000000..6c608a9 --- /dev/null +++ b/.github/workflows/ci-pipeline.yaml @@ -0,0 +1,105 @@ +name: CI Pipeline + +on: + pull_request: + push: + branches: [main] + tags: + - "v*" + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: pip install black isort ruff + - run: black --check . + - run: isort --check-only . + - run: ruff check . + + build: + runs-on: ubuntu-latest + needs: lint + outputs: + image_tag: ${{ steps.meta.outputs.sha_tag }} + steps: + - uses: actions/checkout@v4 + - name: Generate tag + id: meta + run: echo "sha_tag=sha-${GITHUB_SHA::7}" >> $GITHUB_OUTPUT + - name: Build and tag Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./Containerfile + load: true + tags: test-image:${{ steps.meta.outputs.sha_tag }} + + test: + needs: [lint, build] + runs-on: ubuntu-latest + strategy: + matrix: + db: [pgvector, redis, elasticsearch, qdrant] + services: + pgvector: + image: ankane/pgvector + ports: + - 5432:5432 + env: + POSTGRES_USER: user + POSTGRES_PASSWORD: pass + POSTGRES_DB: mydb + redis: + image: redis/redis-stack-server:6.2.6-v19 + ports: + - 6379:6379 + elasticsearch: + image: elasticsearch:8.11.1 + ports: + - 9200:9200 + env: + discovery.type: single-node + xpack.security.enabled: true + ELASTIC_PASSWORD: changeme + ES_JAVA_OPTS: "-Xms512m -Xmx512m" + qdrant: + image: qdrant/qdrant + ports: + - 6333:6333 + + steps: + - uses: actions/checkout@v4 + - name: Wait for DB to start + run: sleep 30 + - name: Run embed job + run: docker run --rm --network host test-image:latest + + release: + if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-latest + needs: [lint, build, test] + steps: + - uses: actions/checkout@v4 + - name: Log in to Quay.io + uses: docker/login-action@v3 + with: + registry: quay.io + username: ${{ secrets.QUAY_USERNAME }} + password: ${{ secrets.QUAY_PASSWORD }} + - name: Tag and push image + run: | + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:${{ needs.build.outputs.image_tag }} + + if [[ $GITHUB_REF == refs/tags/* ]]; then + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:${GITHUB_REF#refs/tags/} + docker push quay.io/dminnear/vector-embedder:${GITHUB_REF#refs/tags/} + elif [[ $GITHUB_REF == refs/heads/main ]]; then + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:latest + docker push quay.io/dminnear/vector-embedder:latest + fi + + docker push quay.io/dminnear/vector-embedder:${{ needs.build.outputs.image_tag }} diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml deleted file mode 100644 index f5555d0..0000000 --- a/.github/workflows/lint.yaml +++ /dev/null @@ -1,28 +0,0 @@ -name: Lint and format checks - -on: - push: - branches: [main] - pull_request: - -jobs: - lint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: "3.12" - - - name: Install tools - run: | - pip install black isort ruff - - - name: Run Black - run: black --check . - - - name: Run Isort - run: isort --check-only . - - - name: Run Ruff - run: ruff check . diff --git a/.github/workflows/push-to-quay.yaml b/.github/workflows/push-to-quay.yaml deleted file mode 100644 index 33b32e4..0000000 --- a/.github/workflows/push-to-quay.yaml +++ /dev/null @@ -1,51 +0,0 @@ -name: Build and Push Container - -on: - push: - branches: - - main - tags: - - 'v*' - -env: - IMAGE_NAME: quay.io/dminnear/vector-embedder - -jobs: - build-and-push: - runs-on: ubuntu-latest - - steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to Quay.io - uses: docker/login-action@v3 - with: - registry: quay.io - username: ${{ secrets.QUAY_USERNAME }} - password: ${{ secrets.QUAY_PASSWORD }} - - - name: Determine tags - id: meta - run: | - echo "sha_tag=sha-$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - if [[ $GITHUB_REF == refs/tags/* ]]; then - echo "is_tag=true" >> $GITHUB_OUTPUT - echo "git_tag=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT - else - echo "is_tag=false" >> $GITHUB_OUTPUT - fi - - - name: Build and push Docker image - uses: docker/build-push-action@v5 - with: - context: . - file: ./Containerfile - push: true - tags: | - ${{ env.IMAGE_NAME }}:${{ steps.meta.outputs.sha_tag }} - ${{ steps.meta.outputs.is_tag == 'true' && format('{0}:{1}', env.IMAGE_NAME, steps.meta.outputs.git_tag) || '' }} - ${{ steps.meta.outputs.is_tag == 'false' && format('{0}:latest', env.IMAGE_NAME) || '' }} From 971656d7b2f0b70035c0fbeb0b9f25c7f647f62f Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 16:33:52 -0400 Subject: [PATCH 04/12] fix isort issue in gitloader --- loaders/git.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loaders/git.py b/loaders/git.py index 0372316..f1c4181 100644 --- a/loaders/git.py +++ b/loaders/git.py @@ -1,6 +1,6 @@ import logging -import subprocess import shutil +import subprocess from pathlib import Path from typing import List From 7e224db76dd2075ac5df98e2e6a54bc9d737f650 Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 16:46:32 -0400 Subject: [PATCH 05/12] use sha-tagged test image and make sure all jobs run to completion --- .github/workflows/ci-pipeline.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml index 6c608a9..50f075e 100644 --- a/.github/workflows/ci-pipeline.yaml +++ b/.github/workflows/ci-pipeline.yaml @@ -42,6 +42,7 @@ jobs: needs: [lint, build] runs-on: ubuntu-latest strategy: + fail-fast: false matrix: db: [pgvector, redis, elasticsearch, qdrant] services: @@ -76,7 +77,7 @@ jobs: - name: Wait for DB to start run: sleep 30 - name: Run embed job - run: docker run --rm --network host test-image:latest + run: docker run --rm --network host test-image:${{ needs.build.outputs.image_tag }} release: if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') From cd3866f9faa1e7c99d759adbc8283c3b86e06b29 Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 16:58:35 -0400 Subject: [PATCH 06/12] use build image as artifact between stages --- .github/workflows/ci-pipeline.yaml | 39 +++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml index 50f075e..ea5fe2f 100644 --- a/.github/workflows/ci-pipeline.yaml +++ b/.github/workflows/ci-pipeline.yaml @@ -30,13 +30,18 @@ jobs: - name: Generate tag id: meta run: echo "sha_tag=sha-${GITHUB_SHA::7}" >> $GITHUB_OUTPUT - - name: Build and tag Docker image - uses: docker/build-push-action@v5 + + - name: Build Docker image + run: docker build -f Containerfile -t test-image:${{ steps.meta.outputs.sha_tag }} . + + - name: Save Docker image to tar + run: docker save test-image:${{ steps.meta.outputs.sha_tag }} -o image.tar + + - name: Upload image artifact + uses: actions/upload-artifact@v4 with: - context: . - file: ./Containerfile - load: true - tags: test-image:${{ steps.meta.outputs.sha_tag }} + name: test-image + path: image.tar test: needs: [lint, build] @@ -74,8 +79,19 @@ jobs: steps: - uses: actions/checkout@v4 + + - name: Download image artifact + uses: actions/download-artifact@v4 + with: + name: test-image + path: . + + - name: Load Docker image + run: docker load -i image.tar + - name: Wait for DB to start run: sleep 30 + - name: Run embed job run: docker run --rm --network host test-image:${{ needs.build.outputs.image_tag }} @@ -85,12 +101,23 @@ jobs: needs: [lint, build, test] steps: - uses: actions/checkout@v4 + + - name: Download image artifact + uses: actions/download-artifact@v4 + with: + name: test-image + path: . + + - name: Load Docker image + run: docker load -i image.tar + - name: Log in to Quay.io uses: docker/login-action@v3 with: registry: quay.io username: ${{ secrets.QUAY_USERNAME }} password: ${{ secrets.QUAY_PASSWORD }} + - name: Tag and push image run: | docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:${{ needs.build.outputs.image_tag }} From 742de80afc6e5ef7146978412fc45d0e37ae6aab Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 17:24:48 -0400 Subject: [PATCH 07/12] only spin up db container for db used in each job --- .github/workflows/ci-pipeline.yaml | 85 ++++++++++++++++++------------ 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml index ea5fe2f..7b336cb 100644 --- a/.github/workflows/ci-pipeline.yaml +++ b/.github/workflows/ci-pipeline.yaml @@ -27,14 +27,20 @@ jobs: image_tag: ${{ steps.meta.outputs.sha_tag }} steps: - uses: actions/checkout@v4 + - name: Generate tag id: meta run: echo "sha_tag=sha-${GITHUB_SHA::7}" >> $GITHUB_OUTPUT - name: Build Docker image - run: docker build -f Containerfile -t test-image:${{ steps.meta.outputs.sha_tag }} . + uses: docker/build-push-action@v5 + with: + context: . + file: ./Containerfile + load: true + tags: test-image:${{ steps.meta.outputs.sha_tag }} - - name: Save Docker image to tar + - name: Save image as artifact run: docker save test-image:${{ steps.meta.outputs.sha_tag }} -o image.tar - name: Upload image artifact @@ -50,32 +56,6 @@ jobs: fail-fast: false matrix: db: [pgvector, redis, elasticsearch, qdrant] - services: - pgvector: - image: ankane/pgvector - ports: - - 5432:5432 - env: - POSTGRES_USER: user - POSTGRES_PASSWORD: pass - POSTGRES_DB: mydb - redis: - image: redis/redis-stack-server:6.2.6-v19 - ports: - - 6379:6379 - elasticsearch: - image: elasticsearch:8.11.1 - ports: - - 9200:9200 - env: - discovery.type: single-node - xpack.security.enabled: true - ELASTIC_PASSWORD: changeme - ES_JAVA_OPTS: "-Xms512m -Xmx512m" - qdrant: - image: qdrant/qdrant - ports: - - 6333:6333 steps: - uses: actions/checkout@v4 @@ -89,6 +69,41 @@ jobs: - name: Load Docker image run: docker load -i image.tar + - name: Start PGVector + if: matrix.db == 'pgvector' + run: | + docker run -d --name pgvector-test \ + -e POSTGRES_USER=user \ + -e POSTGRES_PASSWORD=pass \ + -e POSTGRES_DB=mydb \ + -p 5432:5432 \ + ankane/pgvector + + - name: Start Redis + if: matrix.db == 'redis' + run: | + docker run -d --name redis-test \ + -p 6379:6379 \ + redis/redis-stack-server:6.2.6-v19 + + - name: Start Elasticsearch + if: matrix.db == 'elasticsearch' + run: | + docker run -d --name es-test \ + -e "discovery.type=single-node" \ + -e "xpack.security.enabled=true" \ + -e "ELASTIC_PASSWORD=changeme" \ + -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ + -p 9200:9200 \ + elasticsearch:8.11.1 + + - name: Start Qdrant + if: matrix.db == 'qdrant' + run: | + docker run -d --name qdrant-test \ + -p 6333:6333 \ + qdrant/qdrant + - name: Wait for DB to start run: sleep 30 @@ -102,6 +117,13 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Log in to Quay.io + uses: docker/login-action@v3 + with: + registry: quay.io + username: ${{ secrets.QUAY_USERNAME }} + password: ${{ secrets.QUAY_PASSWORD }} + - name: Download image artifact uses: actions/download-artifact@v4 with: @@ -111,13 +133,6 @@ jobs: - name: Load Docker image run: docker load -i image.tar - - name: Log in to Quay.io - uses: docker/login-action@v3 - with: - registry: quay.io - username: ${{ secrets.QUAY_USERNAME }} - password: ${{ secrets.QUAY_PASSWORD }} - - name: Tag and push image run: | docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:${{ needs.build.outputs.image_tag }} From 6c05e8840909a973ca1c369bdb3cb22647d9e421 Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 17:53:08 -0400 Subject: [PATCH 08/12] make sure failures log AND raise exceptions --- embed_documents.py | 163 ++++++++++++++++++++++++++------------------- 1 file changed, 95 insertions(+), 68 deletions(-) diff --git a/embed_documents.py b/embed_documents.py index cd11e02..90e62ec 100755 --- a/embed_documents.py +++ b/embed_documents.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import logging +import sys from pathlib import Path import requests @@ -10,83 +11,109 @@ from loaders.pdf import PDFLoader from loaders.web import WebLoader -# Load environment config +# Load configuration config = Config.load() -# Configure logging +# Set up logging logging.basicConfig(level=config.log_level) logger = logging.getLogger(__name__) -# Git-based embedding -if config.repo_sources: - logger.info("Starting Git-based document embedding...") - try: - git_loader = GitLoader(config) - git_chunks = git_loader.load() - - if git_chunks: - logger.info( - "Adding %d document chunks from Git to vector DB", len(git_chunks) - ) - config.db_provider.add_documents(git_chunks) - else: - logger.info("No documents found in Git sources.") - except Exception: - logger.exception("Failed during Git document processing") - -# Separate Web URLs by type -pdf_urls = [url for url in config.web_sources if url.lower().endswith(".pdf")] -html_urls = [url for url in config.web_sources if not url.lower().endswith(".pdf")] - -# HTML URL embedding -if html_urls: - logger.info("Starting HTML-based web document embedding...") - try: - web_loader = WebLoader(config) - web_chunks = web_loader.load(html_urls) - - if web_chunks: - logger.info("Adding %d HTML chunks to vector DB", len(web_chunks)) - config.db_provider.add_documents(web_chunks) - else: - logger.info("No chunks produced from HTML URLs.") - except Exception: - logger.exception("Failed during HTML web document processing") - -# PDF URL embedding -if pdf_urls: - logger.info("Processing PDF documents from web URLs...") - - pdf_dir = Path(config.temp_dir) / "web_pdfs" - pdf_dir.mkdir(parents=True, exist_ok=True) - - downloaded_files = [] - for url in pdf_urls: - try: - response = requests.get(url) - response.raise_for_status() - filename = Path(url.split("/")[-1]) - file_path = pdf_dir / filename - with open(file_path, "wb") as f: - f.write(response.content) +def _fail_and_exit(message: str, exc: Exception) -> None: + """ + Log an error with full traceback and raise the exception. + + Args: + message (str): Contextual message to log with the error. + exc (Exception): The exception to raise. + + This utility is used to ensure proper logging and failure behavior + across all critical stages of the embedding job. + """ + logger.error("%s: %s", message, exc, exc_info=True) + raise exc + - logger.info("Downloaded: %s", file_path) - downloaded_files.append(file_path) +def main() -> None: + # Run Git-based document embedding + if config.repo_sources: + logger.info("Starting Git-based document embedding...") + try: + git_loader = GitLoader(config) + git_chunks = git_loader.load() + + if git_chunks: + logger.info( + "Adding %d Git document chunks to vector DB", len(git_chunks) + ) + config.db_provider.add_documents(git_chunks) + else: + logger.info("No documents found in Git sources.") except Exception as e: - logger.exception("Failed to download %s: %s", url, e) + _fail_and_exit("Failed during Git document processing", e) + + # Split web sources into HTML and PDF URLs + pdf_urls = [url for url in config.web_sources if url.lower().endswith(".pdf")] + html_urls = [url for url in config.web_sources if not url.lower().endswith(".pdf")] - if downloaded_files: + # Run HTML-based web embedding + if html_urls: + logger.info("Starting HTML-based web document embedding...") try: - pdf_loader = PDFLoader(config) - pdf_chunks = pdf_loader.load(downloaded_files) + web_loader = WebLoader(config) + web_chunks = web_loader.load(html_urls) - if pdf_chunks: - logger.info("Adding %d PDF chunks to vector DB", len(pdf_chunks)) - config.db_provider.add_documents(pdf_chunks) + if web_chunks: + logger.info("Adding %d HTML web chunks to vector DB", len(web_chunks)) + config.db_provider.add_documents(web_chunks) else: - logger.info("No chunks produced from downloaded PDFs.") - except Exception: - logger.exception("Failed during PDF web document processing") - -logger.info("Embedding job complete.") + logger.info("No chunks produced from HTML URLs.") + except Exception as e: + _fail_and_exit("Failed during HTML web document processing", e) + + # Run PDF-based web embedding + if pdf_urls: + logger.info("Downloading PDF documents from web URLs...") + pdf_dir = Path(config.temp_dir) / "web_pdfs" + pdf_dir.mkdir(parents=True, exist_ok=True) + + downloaded_files = [] + for url in pdf_urls: + try: + response = requests.get(url) + response.raise_for_status() + + filename = Path(url.split("/")[-1]) + file_path = pdf_dir / filename + with open(file_path, "wb") as f: + f.write(response.content) + + logger.info("Downloaded: %s", file_path) + downloaded_files.append(file_path) + except Exception as e: + _fail_and_exit(f"Failed to download {url}", e) + + if downloaded_files: + try: + pdf_loader = PDFLoader(config) + pdf_chunks = pdf_loader.load(downloaded_files) + + if pdf_chunks: + logger.info( + "Adding %d PDF web chunks to vector DB", len(pdf_chunks) + ) + config.db_provider.add_documents(pdf_chunks) + else: + logger.info("No chunks produced from downloaded PDFs.") + except Exception as e: + _fail_and_exit("Failed during PDF web document processing", e) + + logger.info("Embedding job complete.") + + +if __name__ == "__main__": + try: + main() + except Exception as e: + logger.critical("Fatal error: %s", e, exc_info=True) + sys.exit(1) From 55c0651004f34cd0fb3c630ab12fad9251678324 Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 18:10:42 -0400 Subject: [PATCH 09/12] make sure jobs are actually using the right DB_TYPE --- .github/workflows/ci-pipeline.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml index 7b336cb..ed9dbb8 100644 --- a/.github/workflows/ci-pipeline.yaml +++ b/.github/workflows/ci-pipeline.yaml @@ -108,7 +108,10 @@ jobs: run: sleep 30 - name: Run embed job - run: docker run --rm --network host test-image:${{ needs.build.outputs.image_tag }} + run: | + docker run --rm --network host \ + -e DB_TYPE=${{ matrix.db }} \ + test-image:${{ needs.build.outputs.image_tag }} release: if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') From 8289d23b56210cd764ea88ea36fe2e406208a109 Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 18:32:22 -0400 Subject: [PATCH 10/12] add psycopg2 to requirements and fix elastic db_type --- .github/workflows/ci-pipeline.yaml | 4 ++-- requirements.txt | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml index ed9dbb8..83569fb 100644 --- a/.github/workflows/ci-pipeline.yaml +++ b/.github/workflows/ci-pipeline.yaml @@ -55,7 +55,7 @@ jobs: strategy: fail-fast: false matrix: - db: [pgvector, redis, elasticsearch, qdrant] + db: [pgvector, redis, elastic, qdrant] steps: - uses: actions/checkout@v4 @@ -87,7 +87,7 @@ jobs: redis/redis-stack-server:6.2.6-v19 - name: Start Elasticsearch - if: matrix.db == 'elasticsearch' + if: matrix.db == 'elastic' run: | docker run -d --name es-test \ -e "discovery.type=single-node" \ diff --git a/requirements.txt b/requirements.txt index bbe4479..6ab5d1b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ langchain-qdrant==0.2.0 langchain-sqlserver==0.1.1 langchain==0.3.23 psycopg[binary]==3.2.6 +psycopg2-binary==2.9.10 pyodbc==5.2.0 pypdf==5.4.0 python-dotenv==1.1.0 From 538dc4f49b10e5f68372b299ee1c5dc8331f9224 Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 19:15:37 -0400 Subject: [PATCH 11/12] log upon successful connection to db --- vector_db/elastic_provider.py | 6 ++++++ vector_db/pgvector_provider.py | 15 +++++++++++++++ vector_db/qdrant_provider.py | 12 +++++++++++- vector_db/redis_provider.py | 1 + vector_db/sqlserver_provider.py | 8 ++++++++ 5 files changed, 41 insertions(+), 1 deletion(-) diff --git a/vector_db/elastic_provider.py b/vector_db/elastic_provider.py index 687a888..415c98e 100644 --- a/vector_db/elastic_provider.py +++ b/vector_db/elastic_provider.py @@ -1,3 +1,4 @@ +import logging from typing import List from langchain_core.documents import Document @@ -5,6 +6,8 @@ from vector_db.db_provider import DBProvider +logger = logging.getLogger(__name__) + class ElasticProvider(DBProvider): """ @@ -28,6 +31,7 @@ class ElasticProvider(DBProvider): def __init__(self, url: str, password: str, index: str, user: str): super().__init__() + self.db = ElasticsearchStore( embedding=self.embeddings, es_url=url, @@ -36,6 +40,8 @@ def __init__(self, url: str, password: str, index: str, user: str): index_name=index, ) + logger.info("Connected to Elasticsearch at %s (index: %s)", url, index) + def add_documents(self, docs: List[Document]) -> None: """ Add documents to the Elasticsearch index. diff --git a/vector_db/pgvector_provider.py b/vector_db/pgvector_provider.py index 1f94b04..2941dca 100644 --- a/vector_db/pgvector_provider.py +++ b/vector_db/pgvector_provider.py @@ -1,10 +1,14 @@ +import logging from typing import List +from urllib.parse import urlparse from langchain_core.documents import Document from langchain_postgres import PGVector from vector_db.db_provider import DBProvider +logger = logging.getLogger(__name__) + class PGVectorProvider(DBProvider): """ @@ -27,12 +31,23 @@ class PGVectorProvider(DBProvider): def __init__(self, url: str, collection_name: str): super().__init__() + self.db = PGVector( connection=url, collection_name=collection_name, embeddings=self.embeddings, ) + parsed = urlparse(url) + postgres_location = ( + f"{parsed.hostname}:{parsed.port or 5432}/{parsed.path.lstrip('/')}" + ) + logger.info( + "Connected to PGVector at %s (collection: %s)", + postgres_location, + collection_name, + ) + def add_documents(self, docs: List[Document]) -> None: """ Add a list of documents to the pgvector-backed vector store. diff --git a/vector_db/qdrant_provider.py b/vector_db/qdrant_provider.py index 41f13ff..48cd679 100644 --- a/vector_db/qdrant_provider.py +++ b/vector_db/qdrant_provider.py @@ -1,3 +1,4 @@ +import logging from typing import List, Optional from langchain_core.documents import Document @@ -7,6 +8,8 @@ from vector_db.db_provider import DBProvider +logger = logging.getLogger(__name__) + class QdrantProvider(DBProvider): """ @@ -31,6 +34,7 @@ class QdrantProvider(DBProvider): def __init__(self, url: str, collection: str, api_key: Optional[str] = None): super().__init__() self.collection = collection + self.url = url self.client = QdrantClient( url=url, @@ -46,11 +50,17 @@ def __init__(self, url: str, collection: str, api_key: Optional[str] = None): embedding=self.embeddings, ) + logger.info( + "Connected to Qdrant instance at %s (collection: %s)", + self.url, + self.collection, + ) + def _collection_exists(self) -> bool: return self.client.collection_exists(self.collection) def _create_collection(self) -> None: - vector_size = self.embeddings.embed_query("test").__len__() + vector_size = len(self.embeddings.embed_query("test")) self.client.recreate_collection( collection_name=self.collection, vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE), diff --git a/vector_db/redis_provider.py b/vector_db/redis_provider.py index 7b509ee..9fc24b5 100644 --- a/vector_db/redis_provider.py +++ b/vector_db/redis_provider.py @@ -41,6 +41,7 @@ def __init__(self, url: str, index: str, schema: str): try: self.redis_client = redis.from_url(self.url) self.redis_client.ping() + logger.info("Connected to Redis instance at %s", self.url) except Exception: logger.exception("Failed to connect to Redis at %s", self.url) raise diff --git a/vector_db/sqlserver_provider.py b/vector_db/sqlserver_provider.py index e058aac..873e7a0 100644 --- a/vector_db/sqlserver_provider.py +++ b/vector_db/sqlserver_provider.py @@ -59,6 +59,14 @@ def __init__( self.connection_string = self._build_connection_string(self.database) self._ensure_database_exists() + + logger.info( + "Connected to SQL Server at %s:%s, database: %s", + self.host, + self.port, + self.database, + ) + self.db = SQLServer_VectorStore( connection_string=self.connection_string, embedding_function=self.embeddings, From 99f7b983fddd5feb1e9680ef05eb5e48bbdc257c Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Sat, 19 Apr 2025 19:50:13 -0400 Subject: [PATCH 12/12] initialize logger in config before anything else --- .github/workflows/ci-pipeline.yaml | 1 + config.py | 42 +++++++++++++++++------------- embed_documents.py | 4 --- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml index 83569fb..06ff518 100644 --- a/.github/workflows/ci-pipeline.yaml +++ b/.github/workflows/ci-pipeline.yaml @@ -110,6 +110,7 @@ jobs: - name: Run embed job run: | docker run --rm --network host \ + -e LOG_LEVEL=debug \ -e DB_TYPE=${{ matrix.db }} \ test-image:${{ needs.build.outputs.image_tag }} diff --git a/config.py b/config.py index 8bedf64..c5f0f43 100644 --- a/config.py +++ b/config.py @@ -1,4 +1,5 @@ import json +import logging import os from dataclasses import dataclass from typing import Dict, List @@ -97,13 +98,28 @@ def load() -> "Config": load_dotenv() get = Config._get_required_env_var + # Initialize logger + log_level_name = get("LOG_LEVEL").lower() + log_levels = { + "debug": 10, + "info": 20, + "warning": 30, + "error": 40, + "critical": 50, + } + if log_level_name not in log_levels: + raise ValueError( + f"Invalid LOG_LEVEL: '{log_level_name}'. Must be one of: {', '.join(log_levels)}" + ) + log_level = log_levels[log_level_name] + logging.basicConfig(level=log_level) + logger = logging.getLogger(__name__) + logger.debug("Logging initialized at level: %s", log_level_name.upper()) + + # Initialize db db_type = get("DB_TYPE") db_provider = Config._init_db_provider(db_type) - chunk_size = int(get("CHUNK_SIZE")) - chunk_overlap = int(get("CHUNK_OVERLAP")) - temp_dir = get("TEMP_DIR") - # Web URLs web_sources_raw = get("WEB_SOURCES") try: @@ -118,20 +134,10 @@ def load() -> "Config": except json.JSONDecodeError as e: raise ValueError(f"Invalid REPO_SOURCES JSON: {e}") from e - # Logging - log_level_name = get("LOG_LEVEL").lower() - log_levels = { - "debug": 10, - "info": 20, - "warning": 30, - "error": 40, - "critical": 50, - } - if log_level_name not in log_levels: - raise ValueError( - f"Invalid LOG_LEVEL: '{log_level_name}'. Must be one of: {', '.join(log_levels)}" - ) - log_level = log_levels[log_level_name] + # Misc + chunk_size = int(get("CHUNK_SIZE")) + chunk_overlap = int(get("CHUNK_OVERLAP")) + temp_dir = get("TEMP_DIR") return Config( db_provider=db_provider, diff --git a/embed_documents.py b/embed_documents.py index 90e62ec..6e342d9 100755 --- a/embed_documents.py +++ b/embed_documents.py @@ -11,11 +11,7 @@ from loaders.pdf import PDFLoader from loaders.web import WebLoader -# Load configuration config = Config.load() - -# Set up logging -logging.basicConfig(level=config.log_level) logger = logging.getLogger(__name__)