In [2]:
from pymilvus import connections, db

conn = connections.connect(host="127.0.0.1", port=19530)

database = db.create_database("my_database")


In [3]:
conn = connections.connect(
    host="127.0.0.1",
    port="19530",
    db_name="my_database"
)


In [14]:
import uuid
from pymilvus import (
    connections, Collection, CollectionSchema, FieldSchema, DataType, utility
)


class MilvusManager:
    def __init__(self, host="127.0.0.1", port="19530"):
        # Connect to Milvus server
        connections.connect(host=host, port=port, db_name="my_database")
    def _sanitize_tenant_id(self, tenant_id):
        """Helper function to sanitize tenant_id by replacing hyphens with underscores."""
        return tenant_id.replace("-", "_")

    def create_tenant_collection(self, tenant_id, fields):
        """
        Creates a collection for a tenant identified by tenant_id (UUID).

        :param tenant_id: Unique identifier for the tenant (UUID).
        :param fields: List of FieldSchema objects defining the collection schema.
        """
        # Replace hyphens with underscores to make it valid for Milvus
        sanitized_tenant_id = tenant_id.replace("-", "_")
        print(sanitized_tenant_id)
        collection_name = f"tenant_{sanitized_tenant_id}"

        if utility.has_collection(collection_name):
            print(f"Collection {collection_name} already exists.")
            return

        # Create collection schema
        schema = CollectionSchema(fields, description=f"Collection for tenant {sanitized_tenant_id}")

        # Create the collection
        collection = Collection(name=collection_name, schema=schema)
        print(f"Collection {collection_name} created.")


    def insert_data(self, tenant_id, data):
        """
        Insert data into the tenant-specific collection and create an index.
        
        :param tenant_id: Unique identifier for the tenant (UUID).
        :param data: List of data records to insert (should match the schema of the collection).
        """
        sanitized_tenant_id = self._sanitize_tenant_id(tenant_id)
        collection_name = f"tenant_{sanitized_tenant_id}"

        if not utility.has_collection(collection_name):
            raise Exception(f"Collection {collection_name} does not exist.")

        collection = Collection(collection_name)
        
        # Insert data
        collection.insert(data)
        collection.flush()
        print(f"Data inserted into collection {collection_name}.")

        # Create an index for the vector field (assuming the field name is 'vector')
        index_params = {
            "index_type": "IVF_FLAT",  # Index type (you can choose other types like IVF_SQ8, HNSW, etc.)
            "metric_type": "L2",       # Metric type (L2 for Euclidean distance)
            "params": {"nlist": 128}   # Index parameters (nlist is a typical parameter for IVF indexes)
        }
        
        collection.create_index(field_name="vector", index_params=index_params)
        print(f"Index created for collection {collection_name}.")


    def search(self, tenant_id, query_vectors, top_k, search_params):
        """
        Search for the most similar vectors in a tenant-specific collection.
        
        :param tenant_id: Unique identifier for the tenant (UUID).
        :param query_vectors: List of query vectors.
        :param top_k: Number of top results to return.
        :param search_params: Search parameters (depends on index type).
        :return: Search results.
        """
        sanitized_tenant_id = tenant_id.replace("-", "_")
        print(sanitized_tenant_id)
        collection_name = f"tenant_{sanitized_tenant_id}"
        if not utility.has_collection(collection_name):
            raise Exception(f"Collection {collection_name} does not exist.")

        collection = Collection(collection_name)
        collection.load()
        results = collection.search(
            data=query_vectors,
            anns_field="vector",  # Assuming 'vector' is the field for embeddings
            param=search_params,
            limit=top_k
        )

        return results

    def list_collections(self):
        """
        List all collections available on the server.
        """
        collections = utility.list_collections()
        return collections


# Example usage:
if __name__ == "__main__":
    manager = MilvusManager()

    # Example tenant UUID
    tenant_uuid = str(uuid.uuid4())

    # Define the schema for the collection
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128)
    ]

    # Create a collection for a tenant
    manager.create_tenant_collection(tenant_id=tenant_uuid, fields=fields)

    # Example data for insertion
    sample_vectors = [
        [0.1] * 128,  # Vector 1 (128-dimensional vector)
        [0.2] * 128,  # Vector 2
    ]

    # Insert data into the tenant's collection
    manager.insert_data(tenant_id=tenant_uuid, data=[sample_vectors])

    # Example search parameters
    search_params = {"metric_type": "L2", "params": {"nprobe": 10}}

    # Search for the nearest vectors
    results = manager.search(tenant_id=tenant_uuid, query_vectors=sample_vectors, top_k=2, search_params=search_params)

    # Print search results
    for result in results:
        print(result)

    # List all collections
    collections = manager.list_collections()
    print("Collections:", collections)


fc4e1166_382b_4c9c_8241_066dbb381c5d
Collection tenant_fc4e1166_382b_4c9c_8241_066dbb381c5d created.
Data inserted into collection tenant_fc4e1166_382b_4c9c_8241_066dbb381c5d.
Index created for collection tenant_fc4e1166_382b_4c9c_8241_066dbb381c5d.
fc4e1166_382b_4c9c_8241_066dbb381c5d
['id: 453421160898763603, distance: 0.0, entity: {}', 'id: 453421160898763604, distance: 1.2799999713897705, entity: {}']
['id: 453421160898763604, distance: 0.0, entity: {}', 'id: 453421160898763603, distance: 1.2799999713897705, entity: {}']
Collections: ['tenant_4d8d0ccc_b14c_4ba9_9ce8_6f03e523398f', 'tenant_56323216_95ce_4d01_82a0_067dc5121ae9', 'tenant_4bb09293_3382_433f_a392_8a366078e328', 'tenant_f8765199_fdc3_4b16_aacb_f95e97f1987e', 'tenant_fb14b22e_8959_47c8_b157_0bd56230c7b2', 'tenant_5cb50ef6_e2ae_4ad6_9f8c_fd41c55968ef', 'tenant_fc4e1166_382b_4c9c_8241_066dbb381c5d', 'tenant_243a9024_b6ef_4076_969a_6d6cd302af04']


In [None]:
import uuid
from pymilvus import (
    connections, Collection, CollectionSchema, FieldSchema, DataType, utility
)
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.openai import OpenAIEmbeddings
openai_api_key="sk-svcacct-c1AF_p-TD4aE2COVrAXdTbYg-7uQZC9l1H6Qb8ELH039f00mL0CRf641zRZqHXsEtbkI73UT3BlbkFJB1Bz4eVEyOKitab3EvtpmhMoRdVLv8pDjhAbMgZ0ApwRLhKoufWKE4oTsAJ"
class MilvusManager:
    def __init__(self, host="127.0.0.1", port="19530", openai_api_key=None):
        # Connect to Milvus server
        connections.connect(host=host, port=port, db_name="my_database")

        # Initialize OpenAI embeddings via LangChain
        if openai_api_key is None:
            raise ValueError("OpenAI API key must be provided")
        self.embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)

    def _sanitize_tenant_id(self, tenant_id):
        """Helper function to sanitize tenant_id by replacing hyphens with underscores."""
        return tenant_id.replace("-", "_")

    def create_tenant_collection(self, tenant_id, fields):
        """
        Creates a collection for a tenant identified by tenant_id (UUID).

        :param tenant_id: Unique identifier for the tenant (UUID).
        :param fields: List of FieldSchema objects defining the collection schema.
        """
        # Replace hyphens with underscores to make it valid for Milvus
        sanitized_tenant_id = tenant_id.replace("-", "_")
        collection_name = f"tenant_{sanitized_tenant_id}"

        if utility.has_collection(collection_name):
            print(f"Collection {collection_name} already exists.")
            return

        # Create collection schema
        schema = CollectionSchema(fields, description=f"Collection for tenant {sanitized_tenant_id}")

        # Create the collection
        collection = Collection(name=collection_name, schema=schema)
        print(f"Collection {collection_name} created.")

    def insert_data(self, tenant_id, texts):
        """
        Insert text data into the tenant-specific collection after generating embeddings.
        
        :param tenant_id: Unique identifier for the tenant (UUID).
        :param texts: List of text records to insert.
        """
        # Generate embeddings
        embeddings = self.generate_embeddings(texts)
        
        sanitized_tenant_id = self._sanitize_tenant_id(tenant_id)
        collection_name = f"tenant_{sanitized_tenant_id}"

        if not utility.has_collection(collection_name):
            raise Exception(f"Collection {collection_name} does not exist.")

        collection = Collection(collection_name)
        
        # Insert data (embeddings)
        collection.insert([embeddings])
        collection.flush()
        print(f"Embeddings inserted into collection {collection_name}.")

        # Create an index for the vector field (assuming the field name is 'vector')
        index_params = {
            "index_type": "IVF_FLAT",  # Index type (you can choose other types like IVF_SQ8, HNSW, etc.)
            "metric_type": "L2",       # Metric type (L2 for Euclidean distance)
            "params": {"nlist": 128}   # Index parameters (nlist is a typical parameter for IVF indexes)
        }
        
        collection.create_index(field_name="vector", index_params=index_params)
        print(f"Index created for collection {collection_name}.")

    def generate_embeddings(self, texts):
        """
        Generate embeddings using OpenAI API through LangChain.
        
        :param texts: List of texts to generate embeddings for.
        :return: List of embeddings.
        """
        embeddings = self.embeddings.embed_documents(texts)
        return embeddings

    def search(self, tenant_id, query_texts, top_k, search_params):
        """
        Search for the most similar vectors in a tenant-specific collection.
        
        :param tenant_id: Unique identifier for the tenant (UUID).
        :param query_texts: List of query texts.
        :param top_k: Number of top results to return.
        :param search_params: Search parameters (depends on index type).
        :return: Search results.
        """
        sanitized_tenant_id = tenant_id.replace("-", "_")
        collection_name = f"tenant_{sanitized_tenant_id}"

        if not utility.has_collection(collection_name):
            raise Exception(f"Collection {collection_name} does not exist.")

        # Generate embeddings for the query texts
        query_embeddings = self.generate_embeddings(query_texts)

        collection = Collection(collection_name)
        collection.load()
        results = collection.search(
            data=query_embeddings,
            anns_field="vector",  # Assuming 'vector' is the field for embeddings
            param=search_params,
            limit=top_k
        )

        return results

    def list_collections(self):
        """
        List all collections available on the server.
        """
        collections = utility.list_collections()
        return collections


# Example usage:
if __name__ == "__main__":
    OPENAI_API_KEY = "your-openai-api-key"

    manager = MilvusManager(openai_api_key=OPENAI_API_KEY)

    # Example tenant UUID
    tenant_uuid = str(uuid.uuid4())

    # Define the schema for the collection
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1536)  # OpenAI embeddings are 1536 dimensions
    ]

    # Create a collection for a tenant
    manager.create_tenant_collection(tenant_id=tenant_uuid, fields=fields)

    # Example text data for insertion
    texts = [
        "Milvus is an open-source vector database.",
        "It helps you manage unstructured data."
    ]

    # Insert data into the tenant's collection (texts will be converted to embeddings)
    manager.insert_data(tenant_id=tenant_uuid, texts=texts)

    # Example search parameters
    search_params = {"metric_type": "L2", "params": {"nprobe": 10}}

    # Search for the nearest vectors to a query text
    query_texts = ["What is Milvus?"]
    results = manager.search(tenant_id=tenant_uuid, query_texts=query_texts, top_k=2, search_params=search_params)

    # Print search results
    for result in results:
        print(result)

    # List all collections
    collections = manager.list_collections()
    print("Collections:", collections)
