In [8]:
import os
import chromadb
import uuid
from chromadb import Documents, EmbeddingFunction, Embeddings, Collection

from pathlib import Path

from openai import OpenAI
from dotenv import load_dotenv 

from typing import Any, Callable, Literal


In [9]:
"""
TODO

Task 1.5: get chroma working on basic example
"""

'\nTODO\n\nTask 1.5: get chroma working on basic example\n'

In [10]:
load_dotenv()

openai_client = OpenAI()

def get_embeddings(docs: Documents, model="text-embedding-3-large"):
   return [result.embedding for result in openai_client.embeddings.create(input=docs, model=model).data]

class OpenAIEmbeddingFunction(EmbeddingFunction):
    def __call__(self, input: Documents) -> Embeddings:
        return get_embeddings(input)

In [11]:
chroma_client = chromadb.PersistentClient(path=".chroma")
collection = chroma_client.get_or_create_collection(name="test", embedding_function=OpenAIEmbeddingFunction())

docs = ["one", "two", "The major scale is just a rotation of the minor scale"]
collection.add(
    ids = [str(uuid.uuid4()) for _ in docs],
    documents = docs
)

In [12]:
collection.query(query_texts=["What is your perspective on the major scale?"])

{'ids': [['b5f4ca23-8969-4e88-a7c1-310962e89dd7',
   '6e319cd1-99a7-427f-8ef8-d7f587061a07',
   'cfe0e2a4-370d-44a4-891d-815e3030cc1d',
   '37672081-35a4-46ff-aadf-a7944a94c942',
   'd464493d-81d0-4cf8-9d6d-0c667fe1e6f2',
   '1bc3b501-08bf-4bd8-9dd1-97c243921253',
   '5b34d9be-b561-48f2-bf4c-4474ee6c8ca3',
   'fa876d71-d245-4664-9466-f1587a259bd8',
   '53810a72-8faf-4503-b422-35f22d6de89c',
   'de182245-e637-4ed9-ad0d-f75e3d31904a']],
 'embeddings': None,
 'documents': [['The major scale is just a rotation of the minor scale',
   'The major scale is just a rotation of the minor scale',
   'The major scale is just a rotation of the minor scale',
   'The major scale is just a rotation of the minor scale',
   'one',
   'one',
   'one',
   'one',
   'two',
   'two']],
 'uris': None,
 'data': None,
 'metadatas': [[None, None, None, None, None, None, None, None, None, None]],
 'distances': [[0.8220085367820464,
   0.8220085367820464,
   0.8220085367820464,
   0.8220085367820464,
   1.7578881

In [13]:
"""
TODO

Task : Create pipeline to add ml-ops-org content to Chroma vector database
    - choose:
        - embedding function 
        - distance metric
    - directory path -> text files -> get_vecdb_entries() VecDBEntry entries { embedding, content, metadata } -> VecDB
    - querying gives VecDBQueryResult[] { content, metadata, distance }
"""

'\nTODO\n\nTask : Create pipeline to add ml-ops-org content to Chroma vector database\n    - choose:\n        - embedding function \n        - distance metric\n    - directory path -> text files -> get_vecdb_entries() VecDBEntry entries { embedding, content, metadata } -> VecDB\n    - querying gives VecDBQueryResult[] { content, metadata, distance }\n'

In [26]:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field

@dataclass
class VecDBChunk:
    content: str
    embedding_key: str | None = None # Optional: defaults to content
    metadata: dict[str, Any] = field(default_factory=dict())

    def __post_init__(self):
        if self.embedding_key is None:
            self.embedding_key = self.content


class VecDBFileChunker(ABC): 
    @abstractmethod
    def __call__(
        self,
        content: str,
        file_name: str | None = None,
        file_path: str | None = None,
        embedding_translation: Callable | None = None
    ) -> list[VecDBChunk]:
        pass

class SimpleFileChunker(VecDBFileChunker):
    def __init__(self, chunk_size: int, chunk_overlap: int):
        # TODO: boundary checks on these
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def __call__(
        self,
        content: str,
        file_name: str = "",
        file_path: str = "",
        embedding_translation: Callable | None = None
    ) -> list[VecDBChunk]:
        chunks: list[VecDBChunk] = []
        for i in range(0, len(content), self.chunk_size):
            start_idx = max(0, i - self.chunk_overlap)
            end_idx = min(len(content), i + self.chunk_size + self.chunk_overlap)
            chunks.append(VecDBChunk(
                content=content[start_idx: end_idx],
                metadata={
                    "file_name": file_name,
                    "file_path": file_path
                }
            ))
        return chunks
    

def create_empty_collection(name: str, embedding_function: EmbeddingFunction, distance_metric: Literal["l2", "cosine", "ip"]):
    try:
        chroma_client.delete_collection(name=name)
        print(f"Collection: {name} already exists. deleting...")
    except Exception as e:
        pass
    print(f"creating new empty collection: {name}")
    return chroma_client.create_collection(
        name=name, 
        embedding_function=embedding_function,
        metadata={"hnsw:space": distance_metric}
    )
      

def read_file_content(path: str, encoding: str = "utf-8"):
    with open(path, "r", encoding=encoding) as f:
     return f.read()
    

def embed_chunks_to_collection(collection: Collection, chunks: list[VecDBChunk]):
    # TODO: handle embedding keys
    collection.add(
        ids=[str(uuid.uuid4()) for _ in range(len(chunks))],
        documents=[chunk.content for chunk in chunks],
        metadatas=[chunk.metadata for chunk in chunks]
    )

def add_directory_to_collection(dir_path: str, collection: Collection, chunker: VecDBFileChunker):
    path = Path(dir_path)
    for file in path.rglob("*.*"):
        file_path = str(file)
        file_name = file.name
        # read file content
        content = read_file_content(file_path)
        # chunk and embed
        chunks = chunker(content, file_name, file_path)
        embed_chunks_to_collection(collection, chunks)

In [27]:
DOCS_PATH = "/home/ph19/dev/llm/llmops/data/ml-ops-org"
COLLECTION_NAME = "ml-ops-org"

add_directory_to_collection(
    dir_path=DOCS_PATH, 
    collection=create_empty_collection(
        name=COLLECTION_NAME,
        embedding_function=OpenAIEmbeddingFunction(),
        distance_metric="l2"
    ), 
    chunker=SimpleFileChunker(chunk_size=1000, chunk_overlap=200)
)

Collection: ml-ops-org already exists. deleting...
creating new empty collection: ml-ops-org


In [31]:
mlops = chroma_client.get_collection(name=COLLECTION_NAME, embedding_function=OpenAIEmbeddingFunction())

In [36]:
mlops.query(query_texts=[
    """Getting started
Being an emerging field, MLOps is rapidly gaining momentum amongst Data Scientists, ML Engineers and AI enthusiasts. Following this trend, the Continuous Delivery Foundation SIG MLOps differentiates the ML models management from traditional software engineering and suggests the following MLOps capabilities:"""
])

{'ids': [['4ceca205-c94c-4cd5-950d-380b9097ed16',
   '02a74a11-80a2-434b-bd25-23796575b9ef',
   'f2e68091-deb7-47bf-85a5-6d6edd53c06b',
   '937c7447-6118-4ee4-98bf-13b5943bfd8b',
   '275c66b1-e2ef-44d3-8991-055135454bb2',
   'b945f078-c57f-4d7a-8657-d432ee7510d9',
   '7aa4cf20-f2f8-4fcf-ba40-e7b01abcc642',
   'fd680c33-38de-4883-9e23-10addd85f11a',
   'bb83271a-6be9-496e-8a75-90ff1d59e3ef',
   '58fbab63-90de-4304-97ed-1c808ab0097d']],
 'embeddings': None,
 'documents': [['idly gaining momentum amongst Data Scientists, ML Engineers and\n AI enthusiasts. Following this trend, the [Continuous\n Delivery Foundation SIG MLOps](https://github.com/cdfoundation/sig-mlops) differentiates the ML models management from traditional\n software engineering and suggests the following MLOps capabilities:\n\n\n\n\n* MLOps aims to unify the release cycle for machine learning and software application release.\n* MLOps enables automated testing of machine learning artifacts (e.g. data validation, ML model