diff --git a/.env b/.env index 8228b32..28e0717 100644 --- a/.env +++ b/.env @@ -15,6 +15,7 @@ CHUNK_SIZE=1024 CHUNK_OVERLAP=40 DB_TYPE=DRYRUN EMBEDDING_MODEL=sentence-transformers/all-mpnet-base-v2 +EMBEDDING_LENGTH=768 # === Redis === REDIS_URL=redis://localhost:6379 @@ -32,13 +33,8 @@ PGVECTOR_URL=postgresql://user:pass@localhost:5432/mydb PGVECTOR_COLLECTION_NAME=documents # === SQL Server === -SQLSERVER_HOST=localhost -SQLSERVER_PORT=1433 -SQLSERVER_USER=sa -SQLSERVER_PASSWORD=StrongPassword! -SQLSERVER_DB=docs -SQLSERVER_TABLE=vector_table -SQLSERVER_DRIVER=ODBC Driver 18 for SQL Server +MSSQL_CONNECTION_STRING="Driver={ODBC Driver 18 for SQL Server};Server=localhost,1433;Database=embeddings;UID=sa;PWD=StrongPassword!;TrustServerCertificate=yes;Encrypt=no;" +MSSQL_TABLE=docs # === Qdrant === QDRANT_URL=http://localhost:6333 diff --git a/.github/workflows/ci-pipeline.yaml b/.github/workflows/ci-pipeline.yaml index 06ff518..6ca75f7 100644 --- a/.github/workflows/ci-pipeline.yaml +++ b/.github/workflows/ci-pipeline.yaml @@ -49,15 +49,36 @@ jobs: name: test-image path: image.tar + check-secrets: + runs-on: ubuntu-latest + outputs: + mssql-available: ${{ steps.check-mssql.outputs.defined }} + steps: + - id: check-mssql + env: + REGISTRY: ${{ secrets.MSSQL_VECTOR_REGISTRY }} + run: | + if [[ -n "$REGISTRY" ]]; then + echo "defined=true" >> "$GITHUB_OUTPUT" + else + echo "defined=false" >> "$GITHUB_OUTPUT" + fi + test: - needs: [lint, build] + needs: [lint, build, check-secrets] runs-on: ubuntu-latest strategy: fail-fast: false matrix: - db: [pgvector, redis, elastic, qdrant] + db: [pgvector, redis, elastic, qdrant, mssql] steps: + - name: Early skip MSSQL if secrets unavailable + if: matrix.db == 'mssql' && needs.check-secrets.outputs.mssql-available != 'true' + run: | + echo "Skipping MSSQL test: secrets missing." + exit 78 + - uses: actions/checkout@v4 - name: Download image artifact @@ -69,6 +90,28 @@ jobs: - name: Load Docker image run: docker load -i image.tar + - name: Log in to registry for SQL-vector preview + if: matrix.db == 'mssql' + uses: docker/login-action@v3 + with: + registry: ${{ secrets.MSSQL_VECTOR_REGISTRY }} + username: ${{ secrets.MSSQL_VECTOR_USERNAME }} + password: ${{ secrets.MSSQL_VECTOR_PASSWORD }} + + - name: Pull preview SQL Server-vector image + if: matrix.db == 'mssql' + run: | + docker pull ${{ secrets.MSSQL_VECTOR_REGISTRY }}/mssql-sql2025-ctp1-3-release/mssql-server-rhel9:17.0.400.5_4 + + - name: Start SQL Server-vector + if: matrix.db == 'mssql' + run: | + docker run -d --name mssql-vector-test \ + -e ACCEPT_EULA=Y \ + -e SA_PASSWORD=StrongPassword! \ + -p 1433:1433 \ + ${{ secrets.MSSQL_VECTOR_REGISTRY }}/mssql-sql2025-ctp1-3-release/mssql-server-rhel9:17.0.400.5_4 + - name: Start PGVector if: matrix.db == 'pgvector' run: | @@ -115,7 +158,7 @@ jobs: test-image:${{ needs.build.outputs.image_tag }} release: - if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') + if: (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/')) && github.event.repository.fork == false runs-on: ubuntu-latest needs: [lint, build, test] steps: @@ -139,14 +182,14 @@ jobs: - name: Tag and push image run: | - docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:${{ needs.build.outputs.image_tag }} + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/hybridcloudpatterns/vector-embedder:${{ needs.build.outputs.image_tag }} if [[ $GITHUB_REF == refs/tags/* ]]; then - docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:${GITHUB_REF#refs/tags/} - docker push quay.io/dminnear/vector-embedder:${GITHUB_REF#refs/tags/} + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/hybridcloudpatterns/vector-embedder:${GITHUB_REF#refs/tags/} + docker push quay.io/hybridcloudpatterns/vector-embedder:${GITHUB_REF#refs/tags/} elif [[ $GITHUB_REF == refs/heads/main ]]; then - docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/dminnear/vector-embedder:latest - docker push quay.io/dminnear/vector-embedder:latest + docker tag test-image:${{ needs.build.outputs.image_tag }} quay.io/hybridcloudpatterns/vector-embedder:latest + docker push quay.io/hybridcloudpatterns/vector-embedder:latest fi - docker push quay.io/dminnear/vector-embedder:${{ needs.build.outputs.image_tag }} + docker push quay.io/hybridcloudpatterns/vector-embedder:${{ needs.build.outputs.image_tag }} diff --git a/README.md b/README.md index ca09d3f..a2171c1 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # 📚 vector-embedder -[![Docker Repository on Quay](https://quay.io/repository/dminnear/vector-embedder/status "Docker Repository on Quay")](https://quay.io/repository/dminnear/vector-embedder) +[![Quay Repository](https://img.shields.io/badge/Quay.io-vector--embedder-blue?logo=quay)](https://quay.io/repository/hybridcloudpatterns/vector-embedder) +[![CI](https://github.com/validatedpatterns-sandbox/vector-embedder/actions/workflows/ci-pipeline.yaml/badge.svg?branch=main)](https://github.com/validatedpatterns-sandbox/vector-embedder/actions/workflows/ci-pipeline.yaml) + **vector-embedder** is a flexible, language-agnostic document ingestion and embedding pipeline. It transforms structured and unstructured content from multiple sources into vector embeddings and stores them in your vector database of choice. diff --git a/config.py b/config.py index 3974d08..0882ff1 100644 --- a/config.py +++ b/config.py @@ -9,10 +9,10 @@ from vector_db.db_provider import DBProvider from vector_db.dryrun_provider import DryRunProvider from vector_db.elastic_provider import ElasticProvider +from vector_db.mssql_provider import MSSQLProvider from vector_db.pgvector_provider import PGVectorProvider from vector_db.qdrant_provider import QdrantProvider from vector_db.redis_provider import RedisProvider -from vector_db.sqlserver_provider import SQLServerProvider @dataclass @@ -109,6 +109,7 @@ def _init_db_provider(db_type: str) -> DBProvider: get = Config._get_required_env_var db_type = db_type.upper() embedding_model = get("EMBEDDING_MODEL") + embedding_length = int(get("EMBEDDING_LENGTH")) if db_type == "REDIS": url = get("REDIS_URL") @@ -128,16 +129,11 @@ def _init_db_provider(db_type: str) -> DBProvider: collection = get("PGVECTOR_COLLECTION_NAME") return PGVectorProvider(embedding_model, url, collection) - elif db_type == "SQLSERVER": - host = get("SQLSERVER_HOST") - port = get("SQLSERVER_PORT") - user = get("SQLSERVER_USER") - password = get("SQLSERVER_PASSWORD") - database = get("SQLSERVER_DB") - table = get("SQLSERVER_TABLE") - driver = get("SQLSERVER_DRIVER") - return SQLServerProvider( - embedding_model, host, port, user, password, database, table, driver + elif db_type == "MSSQL": + connection_string = get("MSSQL_CONNECTION_STRING") + table = get("MSSQL_TABLE") + return MSSQLProvider( + embedding_model, connection_string, table, embedding_length ) elif db_type == "QDRANT": diff --git a/vector_db/mssql_provider.py b/vector_db/mssql_provider.py new file mode 100644 index 0000000..5a94717 --- /dev/null +++ b/vector_db/mssql_provider.py @@ -0,0 +1,158 @@ +import logging +import re +from typing import List, Optional + +import pyodbc +from langchain_core.documents import Document +from langchain_sqlserver import SQLServer_VectorStore + +from vector_db.db_provider import DBProvider + +logger = logging.getLogger(__name__) + + +class MSSQLProvider(DBProvider): + """ + SQL Server-based vector DB provider using LangChain's SQLServer_VectorStore integration. + + This provider connects to a Microsoft SQL Server instance using a full ODBC connection string, + and stores document embeddings in a specified table. If the target database does not exist, + it will be created automatically. + + Attributes: + db (SQLServer_VectorStore): Underlying LangChain-compatible vector store. + connection_string (str): Full ODBC connection string to the SQL Server instance. + + Args: + embedding_model (str): HuggingFace-compatible embedding model to use. + connection_string (str): Full ODBC connection string (including target DB). + table (str): Table name to store vector embeddings. + embedding_length (int): Dimensionality of the embeddings (e.g., 768 for all-mpnet-base-v2). + + Example: + >>> provider = MSSQLProvider( + ... embedding_model="BAAI/bge-large-en-v1.5", + ... connection_string="Driver={ODBC Driver 18 for SQL Server};Server=localhost,1433;Database=docs;UID=sa;PWD=StrongPassword!;TrustServerCertificate=yes;Encrypt=no;", + ... table="embedded_docs", + ... embedding_length=768, + ... ) + >>> provider.add_documents(docs) + """ + + def __init__( + self, + embedding_model: str, + connection_string: str, + table: str, + embedding_length: int, + ) -> None: + """ + Initialize the MSSQLProvider. + + Args: + embedding_model (str): HuggingFace-compatible embedding model to use for generating embeddings. + connection_string (str): Full ODBC connection string including target database name. + table (str): Table name to store document embeddings. + embedding_length (int): Size of the embeddings (number of dimensions). + + Raises: + RuntimeError: If the database specified in the connection string cannot be found or created. + """ + super().__init__(embedding_model) + + self.connection_string = connection_string + self.table = table + + self._ensure_database_exists() + + server = self._extract_server_address() + + logger.info( + "Connected to MSSQL instance at %s (table: %s)", + server, + self.table, + ) + + self.db = SQLServer_VectorStore( + connection_string=self.connection_string, + embedding_function=self.embeddings, + table_name=self.table, + embedding_length=embedding_length, + ) + + def _extract_server_address(self) -> str: + """ + Extract the server address (host,port) from the connection string. + + Returns: + str: The server address portion ("host,port") or "unknown" if not found. + """ + match = re.search(r"Server=([^;]+)", self.connection_string, re.IGNORECASE) + return match.group(1) if match else "unknown" + + def _extract_database_name(self) -> Optional[str]: + """ + Extract the database name from the connection string. + + Returns: + str: Database name if found, else None. + """ + match = re.search(r"Database=([^;]+)", self.connection_string, re.IGNORECASE) + return match.group(1) if match else None + + def _build_connection_string_for_master(self) -> str: + """ + Modify the connection string to point to the 'master' database. + + Returns: + str: Modified connection string. + """ + parts = self.connection_string.split(";") + updated_parts = [ + "Database=master" if p.lower().startswith("database=") else p + for p in parts + if p + ] + return ";".join(updated_parts) + ";" + + def _ensure_database_exists(self) -> None: + """ + Connect to the SQL Server master database and create the target database if missing. + + Raises: + RuntimeError: If the database cannot be created or accessed. + """ + database = self._extract_database_name() + if not database: + raise RuntimeError("No database name found in connection string.") + + master_conn_str = self._build_connection_string_for_master() + try: + with pyodbc.connect(master_conn_str, autocommit=True) as conn: + cursor = conn.cursor() + cursor.execute( + f"IF DB_ID('{database}') IS NULL CREATE DATABASE [{database}]" + ) + cursor.close() + except Exception as e: + logger.exception("Failed to ensure database '%s' exists", database) + raise RuntimeError(f"Failed to ensure database '{database}' exists: {e}") + + def add_documents(self, docs: List[Document]) -> None: + """ + Add documents to the SQL Server table in small batches. + + Args: + docs (List[Document]): LangChain document chunks to embed and store. + + Raises: + Exception: If a batch insert operation fails. + """ + batch_size = 50 + for i in range(0, len(docs), batch_size): + batch = docs[i : i + batch_size] + try: + self.db.add_documents(batch) + except Exception: + logger.exception("Failed to insert batch starting at index %s", i) + raise diff --git a/vector_db/sqlserver_provider.py b/vector_db/sqlserver_provider.py deleted file mode 100644 index 7c9967d..0000000 --- a/vector_db/sqlserver_provider.py +++ /dev/null @@ -1,144 +0,0 @@ -import logging -from typing import List - -import pyodbc -from langchain_core.documents import Document -from langchain_sqlserver import SQLServer_VectorStore - -from vector_db.db_provider import DBProvider - -logger = logging.getLogger(__name__) - - -class SQLServerProvider(DBProvider): - """ - SQL Server-based vector DB provider using LangChain's SQLServer_VectorStore integration. - - This provider connects to a Microsoft SQL Server instance and stores document embeddings - in a specified table. If the target database does not exist, it will be created automatically. - - Attributes: - db (SQLServer_VectorStore): Underlying LangChain-compatible vector store. - connection_string (str): Full ODBC connection string to the SQL Server instance. - - Args: - embedding_model (str): HuggingFace-compatible embedding model to use. - host (str): SQL Server hostname or IP address. - port (str): Port number (typically 1433). - user (str): SQL Server login username. - password (str): SQL Server login password. - database (str): Target database name. Will be created if not present. - table (str): Table name to store vector embeddings. - driver (str): ODBC driver name (e.g., 'ODBC Driver 18 for SQL Server'). - - Example: - >>> provider = SQLServerProvider( - ... embedding_model="BAAI/bge-large-en-v1.5", - ... host="localhost", - ... port="1433", - ... user="sa", - ... password="StrongPassword!", - ... database="my_vectors", - ... table="embedded_docs", - ... driver="ODBC Driver 18 for SQL Server" - ... ) - >>> provider.add_documents(docs) - """ - - def __init__( - self, - embedding_model: str, - host: str, - port: str, - user: str, - password: str, - database: str, - table: str, - driver: str, - ) -> None: - super().__init__(embedding_model) - - self.host = host - self.port = port - self.user = user - self.password = password - self.database = database - self.table = table - self.driver = driver - - self.connection_string = self._build_connection_string(self.database) - self._ensure_database_exists() - - logger.info( - "Connected to SQL Server at %s:%s, database: %s", - self.host, - self.port, - self.database, - ) - - self.db = SQLServer_VectorStore( - connection_string=self.connection_string, - embedding_function=self.embeddings, - table_name=self.table, - embedding_length=768, # Ensure this matches the model you're using - ) - - def _build_connection_string(self, db_name: str) -> str: - """ - Construct a SQL Server ODBC connection string. - - Args: - db_name (str): Name of the database to connect to. - - Returns: - str: ODBC-compliant connection string. - """ - return ( - f"Driver={{{self.driver}}};" - f"Server={self.host},{self.port};" - f"Database={db_name};" - f"UID={self.user};" - f"PWD={self.password};" - "TrustServerCertificate=yes;" - "Encrypt=no;" - ) - - def _ensure_database_exists(self) -> None: - """ - Connect to the SQL Server master database and create the target database if missing. - - Raises: - RuntimeError: If the database cannot be created or accessed. - """ - master_conn_str = self._build_connection_string("master") - try: - with pyodbc.connect(master_conn_str, autocommit=True) as conn: - cursor = conn.cursor() - cursor.execute( - f"IF DB_ID('{self.database}') IS NULL CREATE DATABASE [{self.database}]" - ) - cursor.close() - except Exception as e: - logger.exception("Failed to ensure database '%s' exists", self.database) - raise RuntimeError( - f"Failed to ensure database '{self.database}' exists: {e}" - ) - - def add_documents(self, docs: List[Document]) -> None: - """ - Add documents to the SQL Server table in small batches. - - Args: - docs (List[Document]): LangChain document chunks to embed and store. - - Raises: - Exception: If a batch insert operation fails. - """ - batch_size = 50 - for i in range(0, len(docs), batch_size): - batch = docs[i : i + batch_size] - try: - self.db.add_documents(batch) - except Exception: - logger.exception("Failed to insert batch starting at index %s", i) - raise