### Populate Cosmos DB for NoSQL from csv file 

In [1]:
import pandas as pd
import json, os, uuid
from openai import AzureOpenAI
from azure.cosmos import CosmosClient, exceptions, PartitionKey
from azure.cosmos.cosmos_client import ThroughputProperties
from azure.cosmos.partition_key import PartitionKey
from dotenv import load_dotenv

load_dotenv()

from tenacity import retry, wait_random_exponential, stop_after_attempt  

@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(10))
def generate_embeddings(openai_client, text):
    """
    Generates embeddings for a given text using the OpenAI API v1.x
    """
    
    return openai_client.embeddings.create(
        input = text,
        model= os.getenv("AZURE_OPENAI_EMBEDDING_MODEL")
    ).data[0].embedding

# Step 1: Configure your Cosmos DB connection
COSMOS_DB_ENDPOINT = os.getenv('AZURE_COSMOSDB_NOSQL_ENDPOINT')
COSMOS_DB_KEY = os.getenv('AZURE_COSMOSDB_NOSQL_KEY')
DATABASE_NAME = os.getenv('AZURE_COSMOSDB_NOSQL_DATABASE')
CONTAINER_NAME = os.getenv('AZURE_COSMOSDB_NOSQL_CONTAINER')
PARTITION_KEY = os.getenv("AZURE_COSMOSDB_NOSQL_CONTAINER_PARTITION_KEY_PATH")
OFFER_THROUGHPUT = os.getenv("AZURE_COSMOSDB_NOSQL_THROUGHPUT")
CSV_FILE_PATH = "recipes_data.csv"

throughput_properties = ThroughputProperties(auto_scale_max_throughput=OFFER_THROUGHPUT)

indexing_policy = {
    "includedPaths": [
        {"path": "/*"},
    ],
    "excludedPaths": [
        {"path": "/\"_etag\"/?"},
        {"path": "/embedding/*"}
    ],
    "vectorIndexes": [
        {
            "path": "/embedding",
            "type": "quantizedFlat"
        }
    ]
}

vector_embedding_policy = {
    "vectorEmbeddings": [        
        {
            "path": "/embedding",
            "dataType": "float32",
            "distanceFunction": "cosine",
            "dimensions": 384
        }
    ]
}


# Step 2: Initialize Cosmos DB client
print("Creating database and container..")
client = CosmosClient(COSMOS_DB_ENDPOINT, COSMOS_DB_KEY)
database = client.create_database_if_not_exists(id=DATABASE_NAME)
container = database.create_container_if_not_exists(
    id=CONTAINER_NAME,
    partition_key=PartitionKey(path=PARTITION_KEY),
    indexing_policy=indexing_policy,
    vector_embedding_policy=vector_embedding_policy,
    offer_throughput=throughput_properties
)

# Step 3: Initialize Azure OpenAI client
print("Initializing Azure OpenAI client..")
openai_client = AzureOpenAI(
    api_key = os.getenv("AZURE_OPENAI_API_KEY"),  
    api_version = os.getenv("AZURE_OPENAI_API_VERSION"),  
    azure_endpoint =os.getenv("AZURE_OPENAI_ENDPOINT") 
)

# Step 4: Load CSV file using pandas
print("Loading CSV file..")
df = pd.read_csv(CSV_FILE_PATH)

df_length = len(df)
print(f"Total number of records in the CSV file: {df_length}")

Creating database and container..
Initializing Azure OpenAI client..
Loading CSV file..
Total number of records in the CSV file: 2231142


In [2]:
# Step 5: Write to Cosmos DB in batches using parallel execution
from concurrent.futures import ThreadPoolExecutor

print("Writing content to Cosmos DB..")
batch_size = 1000  # Define the batch size based on your requirements

def insert_document(row):
    try:
        key = str(uuid.uuid4())
        doc = {
            "id": key,
            "title": row['title'],
            "ingredients": row['ingredients'],
            "directions": row['directions'],
            "link": row['link'],
            "source": row['source'],
            "NER": row['NER'],        
            "embedding": generate_embeddings(openai_client, str(json.dumps(row.to_dict())))
        }
        container.create_item(doc)
        return 1  # Return 1 to count this row as successfully inserted
    except exceptions.CosmosHttpResponseError as e:
        print(f'An error occurred: {e.message}')
        return 0  # Return 0 on failure

# Use ThreadPoolExecutor to insert documents in parallel
with ThreadPoolExecutor(max_workers=os.cpu_count() * 5) as executor:
    total_rows_loaded = 0  # Initialize counter for loaded rows
    for start in range(0, len(df), batch_size):
        batch_df = df.iloc[start:start + batch_size]
        futures = [executor.submit(insert_document, row) for index, row in batch_df.iterrows()]
        # Wait for all futures in the current batch and count successes
        batch_rows_loaded = sum(f.result() for f in futures)
        total_rows_loaded += batch_rows_loaded
        print(f'Rows loaded: {total_rows_loaded}.')

print(f'Data loading completed. Total rows loaded: {total_rows_loaded}')

Writing content to Cosmos DB..
Rows loaded: 1000.
Rows loaded: 2000.
Rows loaded: 3000.
Rows loaded: 4000.
Rows loaded: 5000.
Rows loaded: 6000.
Rows loaded: 7000.
Rows loaded: 8000.
Rows loaded: 9000.
Rows loaded: 10000.
Rows loaded: 11000.
Rows loaded: 12000.
Rows loaded: 13000.
Rows loaded: 14000.
Rows loaded: 15000.
Rows loaded: 16000.
Rows loaded: 17000.
Rows loaded: 18000.
Rows loaded: 19000.
Rows loaded: 20000.
Rows loaded: 21000.
Rows loaded: 22000.
Rows loaded: 23000.
Rows loaded: 24000.
Rows loaded: 25000.
Rows loaded: 26000.
Rows loaded: 27000.
Rows loaded: 28000.
Rows loaded: 29000.
Rows loaded: 30000.
Rows loaded: 31000.
Rows loaded: 32000.
Rows loaded: 33000.
Rows loaded: 34000.
Rows loaded: 35000.
Rows loaded: 36000.
Rows loaded: 37000.
Rows loaded: 38000.
Rows loaded: 39000.
Rows loaded: 40000.
Rows loaded: 41000.
Rows loaded: 42000.
Rows loaded: 43000.
Rows loaded: 44000.
Rows loaded: 45000.
Rows loaded: 46000.
Rows loaded: 47000.
Rows loaded: 48000.
Rows loaded: 49000