diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml new file mode 100644 index 0000000..06ff518 --- /dev/null +++ b/.github/workflows/ci-pipeline.yaml @@ -0,0 +1,152 @@ +name: CI Pipeline + +on: + pull_request: + push: + branches: [main] + tags: + - "v*" + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: pip install black isort ruff + - run: black --check . + - run: isort --check-only . + - run: ruff check . + + build: + runs-on: ubuntu-latest + needs: lint + outputs: + image_tag: ${{ steps.meta.outputs.sha_tag }} + steps: + - uses: actions/checkout@v4 + + - name: Generate tag + id: meta + run: echo "sha_tag=sha-${GITHUB_SHA::7}" >> $GITHUB_OUTPUT + + - name: Build Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./Containerfile + load: true + tags: test-image:${{ steps.meta.outputs.sha_tag }} + + - name: Save image as artifact + run: docker save test-image:${{ steps.meta.outputs.sha_tag }} -o image.tar + + - name: Upload image artifact + uses: actions/upload-artifact@v4 + with: + name: test-image + path: image.tar + + test: + needs: [lint, build] + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + db: [pgvector, redis, elastic, qdrant] + + steps: + - uses: actions/checkout@v4 + + - name: Download image artifact + uses: actions/download-artifact@v4 + with: + name: test-image + path: . + + - name: Load Docker image + run: docker load -i image.tar + + - name: Start PGVector + if: matrix.db == 'pgvector' + run: | + docker run -d --name pgvector-test \ + -e POSTGRES_USER=user \ + -e POSTGRES_PASSWORD=pass \ + -e POSTGRES_DB=mydb \ + -p 5432:5432 \ + ankane/pgvector + + - name: Start Redis + if: matrix.db == 'redis' + run: | + docker run -d --name redis-test \ + -p 6379:6379 \ + redis/redis-stack-server:6.2.6-v19 + + - name: Start Elasticsearch + if: matrix.db == 'elastic' + run: | + docker run -d --name es-test \ + -e "discovery.type=single-node" \ + -e "xpack.security.enabled=true" \ + -e "ELASTIC_PASSWORD=changeme" \ + -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ + -p 9200:9200 \ + elasticsearch:8.11.1 + + - name: Start Qdrant + if: matrix.db == 'qdrant' + run: | + docker run -d --name qdrant-test \ + -p 6333:6333 \ + qdrant/qdrant + + - name: Wait for DB to start + run: sleep 30 + + - name: Run embed job + run: | + docker run --rm --network host \ + -e LOG_LEVEL=debug \ + -e DB_TYPE=${{ matrix.db }} \ + test-image:${{ needs.build.outputs.image_tag }} + + release: + if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-latest + needs: [lint, build, test] + steps: + - uses: actions/checkout@v4 + + - name: Log in to Quay.io + uses: docker/login-action@v3 + with: + registry: quay.io + username: ${{ secrets.QUAY_USERNAME }} + password: ${{ secrets.QUAY_PASSWORD }} + + - name: Download image artifact + uses: actions/download-artifact@v4 + with: + name: test-image + path: . + + - name: Load Docker image + run: docker load -i image.tar + + - name: Tag and push image + run: | + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:${{ needs.build.outputs.image_tag }} + + if [[ $GITHUB_REF == refs/tags/* ]]; then + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:${GITHUB_REF#refs/tags/} + docker push quay.io/dminnear/vector-embedder:${GITHUB_REF#refs/tags/} + elif [[ $GITHUB_REF == refs/heads/main ]]; then + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:latest + docker push quay.io/dminnear/vector-embedder:latest + fi + + docker push quay.io/dminnear/vector-embedder:${{ needs.build.outputs.image_tag }} diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml deleted file mode 100644 index f5555d0..0000000 --- a/.github/workflows/lint.yaml +++ /dev/null @@ -1,28 +0,0 @@ -name: Lint and format checks - -on: - push: - branches: [main] - pull_request: - -jobs: - lint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: "3.12" - - - name: Install tools - run: | - pip install black isort ruff - - - name: Run Black - run: black --check . - - - name: Run Isort - run: isort --check-only . - - - name: Run Ruff - run: ruff check . diff --git a/.github/workflows/push-to-quay.yaml b/.github/workflows/push-to-quay.yaml deleted file mode 100644 index 33b32e4..0000000 --- a/.github/workflows/push-to-quay.yaml +++ /dev/null @@ -1,51 +0,0 @@ -name: Build and Push Container - -on: - push: - branches: - - main - tags: - - 'v*' - -env: - IMAGE_NAME: quay.io/dminnear/vector-embedder - -jobs: - build-and-push: - runs-on: ubuntu-latest - - steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to Quay.io - uses: docker/login-action@v3 - with: - registry: quay.io - username: ${{ secrets.QUAY_USERNAME }} - password: ${{ secrets.QUAY_PASSWORD }} - - - name: Determine tags - id: meta - run: | - echo "sha_tag=sha-$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - if [[ $GITHUB_REF == refs/tags/* ]]; then - echo "is_tag=true" >> $GITHUB_OUTPUT - echo "git_tag=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT - else - echo "is_tag=false" >> $GITHUB_OUTPUT - fi - - - name: Build and push Docker image - uses: docker/build-push-action@v5 - with: - context: . - file: ./Containerfile - push: true - tags: | - ${{ env.IMAGE_NAME }}:${{ steps.meta.outputs.sha_tag }} - ${{ steps.meta.outputs.is_tag == 'true' && format('{0}:{1}', env.IMAGE_NAME, steps.meta.outputs.git_tag) || '' }} - ${{ steps.meta.outputs.is_tag == 'false' && format('{0}:latest', env.IMAGE_NAME) || '' }} diff --git a/config.py b/config.py index 8bedf64..c5f0f43 100644 --- a/config.py +++ b/config.py @@ -1,4 +1,5 @@ import json +import logging import os from dataclasses import dataclass from typing import Dict, List @@ -97,13 +98,28 @@ def load() -> "Config": load_dotenv() get = Config._get_required_env_var + # Initialize logger + log_level_name = get("LOG_LEVEL").lower() + log_levels = { + "debug": 10, + "info": 20, + "warning": 30, + "error": 40, + "critical": 50, + } + if log_level_name not in log_levels: + raise ValueError( + f"Invalid LOG_LEVEL: '{log_level_name}'. Must be one of: {', '.join(log_levels)}" + ) + log_level = log_levels[log_level_name] + logging.basicConfig(level=log_level) + logger = logging.getLogger(__name__) + logger.debug("Logging initialized at level: %s", log_level_name.upper()) + + # Initialize db db_type = get("DB_TYPE") db_provider = Config._init_db_provider(db_type) - chunk_size = int(get("CHUNK_SIZE")) - chunk_overlap = int(get("CHUNK_OVERLAP")) - temp_dir = get("TEMP_DIR") - # Web URLs web_sources_raw = get("WEB_SOURCES") try: @@ -118,20 +134,10 @@ def load() -> "Config": except json.JSONDecodeError as e: raise ValueError(f"Invalid REPO_SOURCES JSON: {e}") from e - # Logging - log_level_name = get("LOG_LEVEL").lower() - log_levels = { - "debug": 10, - "info": 20, - "warning": 30, - "error": 40, - "critical": 50, - } - if log_level_name not in log_levels: - raise ValueError( - f"Invalid LOG_LEVEL: '{log_level_name}'. Must be one of: {', '.join(log_levels)}" - ) - log_level = log_levels[log_level_name] + # Misc + chunk_size = int(get("CHUNK_SIZE")) + chunk_overlap = int(get("CHUNK_OVERLAP")) + temp_dir = get("TEMP_DIR") return Config( db_provider=db_provider, diff --git a/embed_documents.py b/embed_documents.py index 8ce1282..6e342d9 100755 --- a/embed_documents.py +++ b/embed_documents.py @@ -1,48 +1,115 @@ #!/usr/bin/env python import logging +import sys +from pathlib import Path + +import requests from config import Config from loaders.git import GitLoader +from loaders.pdf import PDFLoader from loaders.web import WebLoader -# Initialize logging config = Config.load() -logging.basicConfig(level=config.log_level) logger = logging.getLogger(__name__) -# Run Git document embedding if sources provided -if config.repo_sources: - logger.info("Starting Git-based document embedding...") - try: - git_loader = GitLoader(config) - git_chunks = git_loader.load() - - if git_chunks: - logger.info( - "Adding %d document chunks from Git to vector DB", len(git_chunks) - ) - config.db_provider.add_documents(git_chunks) - else: - logger.info("No documents found in Git sources.") - except Exception: - logger.exception("Failed during Git document processing") - -# Run Web document embedding if URLs provided -if config.web_sources: - logger.info("Starting Web-based document embedding...") + +def _fail_and_exit(message: str, exc: Exception) -> None: + """ + Log an error with full traceback and raise the exception. + + Args: + message (str): Contextual message to log with the error. + exc (Exception): The exception to raise. + + This utility is used to ensure proper logging and failure behavior + across all critical stages of the embedding job. + """ + logger.error("%s: %s", message, exc, exc_info=True) + raise exc + + +def main() -> None: + # Run Git-based document embedding + if config.repo_sources: + logger.info("Starting Git-based document embedding...") + try: + git_loader = GitLoader(config) + git_chunks = git_loader.load() + + if git_chunks: + logger.info( + "Adding %d Git document chunks to vector DB", len(git_chunks) + ) + config.db_provider.add_documents(git_chunks) + else: + logger.info("No documents found in Git sources.") + except Exception as e: + _fail_and_exit("Failed during Git document processing", e) + + # Split web sources into HTML and PDF URLs + pdf_urls = [url for url in config.web_sources if url.lower().endswith(".pdf")] + html_urls = [url for url in config.web_sources if not url.lower().endswith(".pdf")] + + # Run HTML-based web embedding + if html_urls: + logger.info("Starting HTML-based web document embedding...") + try: + web_loader = WebLoader(config) + web_chunks = web_loader.load(html_urls) + + if web_chunks: + logger.info("Adding %d HTML web chunks to vector DB", len(web_chunks)) + config.db_provider.add_documents(web_chunks) + else: + logger.info("No chunks produced from HTML URLs.") + except Exception as e: + _fail_and_exit("Failed during HTML web document processing", e) + + # Run PDF-based web embedding + if pdf_urls: + logger.info("Downloading PDF documents from web URLs...") + pdf_dir = Path(config.temp_dir) / "web_pdfs" + pdf_dir.mkdir(parents=True, exist_ok=True) + + downloaded_files = [] + for url in pdf_urls: + try: + response = requests.get(url) + response.raise_for_status() + + filename = Path(url.split("/")[-1]) + file_path = pdf_dir / filename + with open(file_path, "wb") as f: + f.write(response.content) + + logger.info("Downloaded: %s", file_path) + downloaded_files.append(file_path) + except Exception as e: + _fail_and_exit(f"Failed to download {url}", e) + + if downloaded_files: + try: + pdf_loader = PDFLoader(config) + pdf_chunks = pdf_loader.load(downloaded_files) + + if pdf_chunks: + logger.info( + "Adding %d PDF web chunks to vector DB", len(pdf_chunks) + ) + config.db_provider.add_documents(pdf_chunks) + else: + logger.info("No chunks produced from downloaded PDFs.") + except Exception as e: + _fail_and_exit("Failed during PDF web document processing", e) + + logger.info("Embedding job complete.") + + +if __name__ == "__main__": try: - web_loader = WebLoader(config) - web_chunks = web_loader.load(config.web_sources) - - if web_chunks: - logger.info( - "Adding %d document chunks from Web to vector DB", len(web_chunks) - ) - config.db_provider.add_documents(web_chunks) - else: - logger.info("No documents returned from provided URLs.") - except Exception: - logger.exception("Failed during Web document processing") - -logger.info("Embedding job complete.") + main() + except Exception as e: + logger.critical("Fatal error: %s", e, exc_info=True) + sys.exit(1) diff --git a/loaders/git.py b/loaders/git.py index 1f6b0ec..f1c4181 100644 --- a/loaders/git.py +++ b/loaders/git.py @@ -1,4 +1,5 @@ import logging +import shutil import subprocess from pathlib import Path from typing import List @@ -14,19 +15,17 @@ class GitLoader: """ - Clones repositories and loads documents based on glob patterns using PDF and text loaders. + Loads and processes documents from Git repositories based on configured glob patterns. - For each repository defined in `config.repo_sources`, this loader: - - Clones the repo to a local folder in TEMP_DIR/source_repo/ - - Resolves the configured globs relative to the repo root - - Loads and chunks `.pdf` files using PDFLoader - - Loads and chunks all other supported text files using TextLoader - - This loader returns all chunked `Document` objects so the caller can decide - how and when to add them to a vector store. + For each configured repository, this loader: + - Clones or pulls the latest repo into a temporary folder + - Applies the configured glob patterns to find matching files + - Loads PDF files using PDFLoader + - Loads supported text files using TextLoader + - Returns all chunked LangChain Document objects (does NOT push to DB) Attributes: - config (Config): The configuration object containing repo info, chunk settings, etc. + config (Config): Application config including temp paths, glob patterns, and chunking rules. Example: >>> config = Config.load() @@ -43,13 +42,15 @@ def __init__(self, config: Config): def load(self) -> List[Document]: """ - Clones all configured repos, applies glob patterns, loads and chunks matched documents. + Loads and chunks documents from all configured Git repos. - Returns: - List[Document]: All chunked documents loaded from the matched files in all repositories. + This includes: + - Cloning or updating each Git repo + - Matching glob patterns to find relevant files + - Loading and chunking documents using appropriate loaders - Raises: - RuntimeError: If cloning or file loading fails. + Returns: + List[Document]: Chunked LangChain documents from all matched files. """ all_chunks: List[Document] = [] @@ -59,9 +60,10 @@ def load(self) -> List[Document]: repo_name = repo_url.rstrip("/").split("/")[-1].replace(".git", "") repo_path = self.base_path / repo_name - self._clone_repo(repo_url, repo_path) + self._ensure_repo_up_to_date(repo_url, repo_path) matched_files = self._collect_files(repo_path, globs) + matched_files = [f for f in matched_files if f.is_file()] pdf_files = [f for f in matched_files if f.suffix.lower() == ".pdf"] text_files = [f for f in matched_files if f.suffix.lower() != ".pdf"] @@ -78,11 +80,24 @@ def load(self) -> List[Document]: return all_chunks - def _clone_repo(self, url: str, dest: Path) -> None: + def _ensure_repo_up_to_date(self, url: str, dest: Path) -> None: if dest.exists(): - logger.info("Repo already cloned at %s, skipping", dest) - return + logger.info("Repo already cloned at %s, attempting pull...", dest) + try: + subprocess.run( + ["git", "-C", str(dest), "pull"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return + except subprocess.CalledProcessError: + logger.warning("git pull failed for %s, removing and recloning...", url) + shutil.rmtree(dest) + + self._clone_repo(url, dest) + def _clone_repo(self, url: str, dest: Path) -> None: logger.info("Cloning repository %s to %s", url, dest) try: subprocess.run( diff --git a/requirements.txt b/requirements.txt index 82bef3a..6ab5d1b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ langchain-postgres==0.0.14 langchain-qdrant==0.2.0 langchain-sqlserver==0.1.1 langchain==0.3.23 +psycopg[binary]==3.2.6 psycopg2-binary==2.9.10 pyodbc==5.2.0 pypdf==5.4.0 @@ -14,4 +15,4 @@ python-dotenv==1.1.0 qdrant-client==1.13.3 redis==5.2.1 sentence-transformers==4.1.0 -unstructured==0.17.2 +unstructured[md]==0.17.2 diff --git a/vector_db/elastic_provider.py b/vector_db/elastic_provider.py index 687a888..415c98e 100644 --- a/vector_db/elastic_provider.py +++ b/vector_db/elastic_provider.py @@ -1,3 +1,4 @@ +import logging from typing import List from langchain_core.documents import Document @@ -5,6 +6,8 @@ from vector_db.db_provider import DBProvider +logger = logging.getLogger(__name__) + class ElasticProvider(DBProvider): """ @@ -28,6 +31,7 @@ class ElasticProvider(DBProvider): def __init__(self, url: str, password: str, index: str, user: str): super().__init__() + self.db = ElasticsearchStore( embedding=self.embeddings, es_url=url, @@ -36,6 +40,8 @@ def __init__(self, url: str, password: str, index: str, user: str): index_name=index, ) + logger.info("Connected to Elasticsearch at %s (index: %s)", url, index) + def add_documents(self, docs: List[Document]) -> None: """ Add documents to the Elasticsearch index. diff --git a/vector_db/pgvector_provider.py b/vector_db/pgvector_provider.py index 1f94b04..2941dca 100644 --- a/vector_db/pgvector_provider.py +++ b/vector_db/pgvector_provider.py @@ -1,10 +1,14 @@ +import logging from typing import List +from urllib.parse import urlparse from langchain_core.documents import Document from langchain_postgres import PGVector from vector_db.db_provider import DBProvider +logger = logging.getLogger(__name__) + class PGVectorProvider(DBProvider): """ @@ -27,12 +31,23 @@ class PGVectorProvider(DBProvider): def __init__(self, url: str, collection_name: str): super().__init__() + self.db = PGVector( connection=url, collection_name=collection_name, embeddings=self.embeddings, ) + parsed = urlparse(url) + postgres_location = ( + f"{parsed.hostname}:{parsed.port or 5432}/{parsed.path.lstrip('/')}" + ) + logger.info( + "Connected to PGVector at %s (collection: %s)", + postgres_location, + collection_name, + ) + def add_documents(self, docs: List[Document]) -> None: """ Add a list of documents to the pgvector-backed vector store. diff --git a/vector_db/qdrant_provider.py b/vector_db/qdrant_provider.py index 41f13ff..48cd679 100644 --- a/vector_db/qdrant_provider.py +++ b/vector_db/qdrant_provider.py @@ -1,3 +1,4 @@ +import logging from typing import List, Optional from langchain_core.documents import Document @@ -7,6 +8,8 @@ from vector_db.db_provider import DBProvider +logger = logging.getLogger(__name__) + class QdrantProvider(DBProvider): """ @@ -31,6 +34,7 @@ class QdrantProvider(DBProvider): def __init__(self, url: str, collection: str, api_key: Optional[str] = None): super().__init__() self.collection = collection + self.url = url self.client = QdrantClient( url=url, @@ -46,11 +50,17 @@ def __init__(self, url: str, collection: str, api_key: Optional[str] = None): embedding=self.embeddings, ) + logger.info( + "Connected to Qdrant instance at %s (collection: %s)", + self.url, + self.collection, + ) + def _collection_exists(self) -> bool: return self.client.collection_exists(self.collection) def _create_collection(self) -> None: - vector_size = self.embeddings.embed_query("test").__len__() + vector_size = len(self.embeddings.embed_query("test")) self.client.recreate_collection( collection_name=self.collection, vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE), diff --git a/vector_db/redis_provider.py b/vector_db/redis_provider.py index 7b509ee..9fc24b5 100644 --- a/vector_db/redis_provider.py +++ b/vector_db/redis_provider.py @@ -41,6 +41,7 @@ def __init__(self, url: str, index: str, schema: str): try: self.redis_client = redis.from_url(self.url) self.redis_client.ping() + logger.info("Connected to Redis instance at %s", self.url) except Exception: logger.exception("Failed to connect to Redis at %s", self.url) raise diff --git a/vector_db/sqlserver_provider.py b/vector_db/sqlserver_provider.py index e058aac..873e7a0 100644 --- a/vector_db/sqlserver_provider.py +++ b/vector_db/sqlserver_provider.py @@ -59,6 +59,14 @@ def __init__( self.connection_string = self._build_connection_string(self.database) self._ensure_database_exists() + + logger.info( + "Connected to SQL Server at %s:%s, database: %s", + self.host, + self.port, + self.database, + ) + self.db = SQLServer_VectorStore( connection_string=self.connection_string, embedding_function=self.embeddings,