# Vector DB

In this notebook, we use a containerized version of Chroma DB. To set up, you will need the following:

1. Install [Docker Desktop](https://www.docker.com/products/docker-desktop/) by following the link and Download Docker Desktop for your operating system.
2. In a terminal window, navigate to the folder ./05_src/chromadb/. For example, on Windows, you would use `cd .\05_src\chromadb`.
3. Run the command `docker compose up -d`, which will start the Chroma DB server.

## Downloading Batch Results

In the previous notebook, we had created batch processes. We will start by consulting the status of our batch processes by identifying them throught their descriptions.

In [None]:
%load_ext dotenv
%dotenv ../../05_src/.secrets

In [None]:
batch_description = 'Pitchfork reviews content embeddings 2025-10-18 12:17:17'

In [None]:
from openai import OpenAI

client = OpenAI()

batch_processes = client.batches.list().to_dict()
batch_info= [
    {'batch_id': batch['id'],
     'description': batch['metadata']['description'],
    'status': batch['status'],
    'request_counts': batch['request_counts'],
    'output_file_id': batch['output_file_id'],
    'input_file_id': batch['input_file_id']}
            for batch in batch_processes['data'] if batch['metadata']['description'] == batch_description
    ]
batch_info

When the status of the batches is complete, we can query the `output_file_id` where their results will be stored.

More generally, we will require the original text and the embeddings of that original text mapped through the `custom_id`.

In [None]:
batch_complete = [
    batch  for batch in batch_info if batch['status'] == 'completed'
]
batch_complete

Before we download all results, examine the response of the file API:

In [None]:
response = client.files.content(batch_complete[0]['output_file_id'])
text_response = response.text
lines = text_response.split('\n')
print(lines[0])


For our results database, we will need to map the original text to their embeddings. 

In [None]:
import json 

def get_text_and_embeddings(batch):
    embedding_lines =  get_content_from_file(batch, 'output_file_id')
    text_lines = get_content_from_file(batch, 'input_file_id')
    return embedding_lines, text_lines

def get_content_from_file(batch, key):
    file = client.files.content(batch[key])
    text = file.text
    lines = text.split('\n')
    content_lines = [json.loads(line) for line in lines if line.strip()]
    return content_lines


Notice that the response is also a .jsonl file. Therefore, we can process it line-by-line and use the `custom_id` to map to the original document chunk.

The functio below:

- Creates a dictionary, `text_dict`, with keys given by each `custom_id` and value equal to the text.
- Iterate over all embedding items and use the dictionary defined above to map the embeddings to their input text.

In [None]:
def create_chroma_inputs(embedding_lines, text_lines):
    chroma_inputs = []
    text_dict = {item['custom_id']: item['body']['input'] for item in text_lines}
    for embed_item in embedding_lines:
        custom_id = embed_item['custom_id']
        text = text_dict.get(custom_id, "")
        chroma_input = {
            'id': embed_item['custom_id'],
            'embedding': embed_item['response']['body']['data'][0]['embedding'],
            'text': text
        }
        chroma_inputs.append(chroma_input)
    return chroma_inputs

A couple of functions to control the logic flow:

In [None]:
def process_batch_for_chromadb(batch):
    embedding_lines, text_lines = get_text_and_embeddings(batch)
    chroma_inputs = create_chroma_inputs(embedding_lines, text_lines)
    return chroma_inputs

def process_batches_for_chromadb(batches):
    all_chroma_inputs = []
    for batch in batches:
        chroma_inputs = process_batch_for_chromadb(batch)
        all_chroma_inputs.extend(chroma_inputs)
    return all_chroma_inputs

Now, we can create our input dictionaries.

In [None]:
chroma_inputs = process_batches_for_chromadb(batch_complete)

In [None]:
chroma_inputs[1]

# Load Embeddings to Chroma

In [1]:

import chromadb
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
import os
from tqdm import tqdm

def setup_collection(chroma_url:str="http://localhost:8000",
                     collection_name: str = "pitchfork_reviews"):
    chroma_client = chromadb.HttpClient(host=chroma_url)
    collections = chroma_client.list_collections()
    if collection_name in [col.name for col in collections]:
        chroma_client.delete_collection(name=collection_name)

    collection = chroma_client.create_collection(
        name=collection_name,
        embedding_function=OpenAIEmbeddingFunction(
            api_key = os.getenv("OPENAI_API_KEY"),
            model_name="text-embedding-3-small")
        )
    return collection

def load_embeddings_to_db(chroma_inputs:list[dict], 
                          collection_name:str,
                          chroma_url:str="http://localhost:8000",
                          batch_size:int= 1000
                          ):

    
    collection = setup_collection(chroma_url=chroma_url, collection_name=collection_name)

    for i in tqdm(range(0, len(chroma_inputs), batch_size)):
        batch = chroma_inputs[i:i + batch_size]
        collection.add(
            documents=[item['text'] for item in batch],
            embeddings=[item['embedding'] for item in batch],
            ids=[item['id'] for item in batch]
        )


In [None]:
vector_db_client_url:str="http://localhost:8000"
load_embeddings_to_db(chroma_inputs=chroma_inputs,
                      collection_name="pitchfork_reviews",
                      chroma_url=vector_db_client_url, 
                      batch_size=1000)

# Query

We can now use chroma's similarity function to query the database. Notice that the query itself needs to be converted to embeddings, so we must provide an `embedding_function`. In this case, we use `OpenAIEmbeddingFunction()` to get compatible embeddings using model `text-embedding-3-small`.

In [None]:
chroma = chromadb.HttpClient(host=vector_db_client_url)
collection = chroma.get_collection(name="pitchfork_reviews", 
                                   embedding_function=OpenAIEmbeddingFunction(
                                       api_key = os.getenv("OPENAI_API_KEY"),
                                       model_name="text-embedding-3-small")
                                   )


In [None]:
collection.query(
    query_texts=["A great album with stunning vocals and production."],
    n_results=3
)