In [2]:
# Load beir dataset
from beir.datasets.data_loader import GenericDataLoader
data_path = "datasets/hotpotqa"
corpus, queries, qrels = GenericDataLoader(data_folder=data_path).load(split="test")

  from tqdm.autonotebook import tqdm
100%|██████████| 5233329/5233329 [01:51<00:00, 46787.40it/s]


In [3]:
print(f'Corpus size: {len(corpus)}')
# Take the first n items from the corpus dictionary, which is not part of the qrels_select
n = 5
corpus_sample = dict(list(corpus.items())[:n])

Corpus size: 5233329


In [4]:
from azure.cosmos import CosmosClient, PartitionKey, exceptions
import logging
import os

from dotenv import load_dotenv

# Load the environment variables from .env
load_dotenv()

# Initialize the Cosmos client
connection_string = os.getenv('COSMOSDB_CONN_STR') 
client = CosmosClient.from_connection_string(connection_string)

# Define the database and container
database_name = os.getenv('COSMOSDB_DB_NAME') 
container_name = os.getenv('COSMOSDB_CONTAINER_NAME') 
database = client.get_database_client(database_name)

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.CRITICAL)  # set urllib3 logging level to CRITICAL
logging.getLogger("azure").setLevel(logging.CRITICAL)  # set urllib3 logging level to CRITICAL

# Define indexing policy and vector embedding policy if needed
vector_embedding_policy = {
    "vectorEmbeddings": [ 
        { 
            "path": "/vectorized_text", 
            "dataType": "float32", 
            "distanceFunction": "euclidean", 
            "dimensions":  1536
        }
    ] 
}

indexing_policy = {
    "indexingMode": "consistent",
    "automatic": True,
    "includedPaths": [
        {
            "path": "/*"
        }
    ],
    "excludedPaths": [
        {
            "path": "/\"_etag\"/?"
        },
        {
            "path": "/vectorized_text/*"
        }
    ],
    "fullTextIndexes": [],
    "vectorIndexes": [
        {
        "path": "/vectorized_text",
        "type": "diskANN",
        "quantizationByteSize": 128,
        "IndexingSearchListSize": 100
        }
    ]
}

# Create container if not exists
try:
    container = database.create_container_if_not_exists(
        id=container_name,
        partition_key=PartitionKey(path='/id'),
        indexing_policy=indexing_policy,
        vector_embedding_policy=vector_embedding_policy
    )
    print(f'Container with id \'{container_name}\' created')
except exceptions.CosmosHttpResponseError as e:
    raise e

Container with id 'hotpotqa' created


In [5]:
from azure.cosmos.aio import CosmosClient
from azure.cosmos.exceptions import CosmosHttpResponseError

async def insert_documents_to_cosmosdb(connection_string: str, database_name: str, container_name: str, documents: list):
    """
    Insert a list of JSON documents into Cosmos DB asynchronously.

    :param connection_string: Cosmos DB connection string for authentication.
    :param database_name: Name of the Cosmos DB database.
    :param container_name: Name of the Cosmos DB container.
    :param documents: List of JSON documents to insert.
    """
    try:
        # Create a Cosmos DB client
        async with CosmosClient.from_connection_string(connection_string) as client:
            # Get the database and container
            database = client.get_database_client(database_name)
            container = database.get_container_client(container_name)

            # Insert documents asynchronously
            tasks = []
            for doc in documents:
                tasks.append(container.upsert_item(doc))  # Use upsert to insert or update

            # Wait for all tasks to complete
            await asyncio.gather(*tasks)
            print(f"Successfully inserted {len(documents)} documents into Cosmos DB.")

    except CosmosHttpResponseError as e:
        print(f"An error occurred: {e.message}")

In [14]:
import asyncio
import logging
from openai import AsyncAzureOpenAI
import time

# Initialize the Azure OpenAI client
api_key = os.getenv('AOAI_API_KEY')
azure_endpoint = os.getenv('AOAI_ENDPOINT')

aclient = AsyncAzureOpenAI(api_key=api_key,
api_version="2024-12-01-preview",
azure_endpoint=azure_endpoint,
max_retries=5)

logging.getLogger("httpcore").setLevel(logging.CRITICAL)  # set urllib3 logging level to CRITICAL
logging.getLogger("openai").setLevel(logging.CRITICAL)  # set urllib3 logging level to CRITICAL
logging.getLogger("httpx").setLevel(logging.CRITICAL)  # set urllib3 logging level to CRITICAL
logging.getLogger("azure.cosmos").setLevel(logging.CRITICAL)
logging.getLogger("azure").setLevel(logging.CRITICAL)

model_name = "text-embedding-3-small"

# Semaphore to limit concurrency
semaphore = asyncio.Semaphore(500)  # Adjust the limit as needed

from azure.core.exceptions import ServiceResponseError

async def vectorize_text(text: str):
    max_retries = 5  # Increase retries for robustness
    retry_delay = 2  # seconds

    for attempt in range(max_retries):
        try:
            async with semaphore:  # Limit the number of concurrent tasks
                response = await aclient.embeddings.create(input=text, model=model_name)
                data = response.data
                if data:
                    return data[0].embedding  # Ensure embedding is JSON-serializable
                return []
        except ServiceResponseError as e:
            print(f"ServiceResponseError: {e}. Attempt {attempt + 1} of {max_retries}. Retrying in {retry_delay} seconds...")
            if attempt < max_retries - 1:
                await asyncio.sleep(retry_delay)
            else:
                print("Max retries reached for ServiceResponseError. Raising exception.")
                raise e
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"Attempt {attempt + 1} failed due to {e}. Retrying in {retry_delay} seconds...")
                await asyncio.sleep(retry_delay)
            else:
                print("Max retries reached. Raising exception.")
                raise e

import time
# Wait to avoid rate limiting by embedding service
wait_time = 0.01

start_time = time.time()

# Corpues to vectorize
#corpus_to_vectorize = corpus_sample
print(f'Length of corpus: {len(corpus)}')

# Process in batches 
batch_size = 500
json_array = []

#keys_values = list(corpus_to_vectorize.items()) # to load only the sample
starting_index = 291000
corpus_to_vectorize = dict(list(corpus.items())[starting_index:])
#corpus_to_vectorize = corpus_sample
print(f'Length of corpus to vectorize: {len(corpus_to_vectorize)}')

keys_values = list(corpus_to_vectorize.items()) # to load everything
for i in range(starting_index, len(keys_values), batch_size):
    batch = keys_values[i:i + batch_size]
    tasks = [vectorize_text(f"Tile: {value['title']}, Text: {value['text']}") for key, value in batch]
    results = await asyncio.gather(*tasks)

    for (key, value), embedding in zip(batch, results):
        if embedding:  # ...existing check logic...
            json_array.append({
                'id': key,
                'text': value['text'],
                'title': value['title'],
                'vectorized_text': embedding
            })
    await insert_documents_to_cosmosdb(connection_string, database_name, container_name, json_array)
    json_array = []
    print(f'Batch {i // batch_size + 1} completed. Processed {len(batch)} items in {time.time() - start_time} seconds')


print(f'All documents has been vectorized. Length of the JSON array: {len(json_array)}')

elapsed_time = time.time() - start_time
print(f"Elapsed time: {elapsed_time:.4f} seconds")



Length of corpus: 5233329
Length of corpus to vectorize: 4942329
Successfully inserted 500 documents into Cosmos DB.
Batch 583 completed. Processed 500 items in 29.062898635864258 seconds
Successfully inserted 500 documents into Cosmos DB.
Batch 584 completed. Processed 500 items in 53.24016547203064 seconds
Successfully inserted 500 documents into Cosmos DB.
Batch 585 completed. Processed 500 items in 78.80029153823853 seconds
Successfully inserted 500 documents into Cosmos DB.
Batch 586 completed. Processed 500 items in 99.9916033744812 seconds
Successfully inserted 500 documents into Cosmos DB.
Batch 587 completed. Processed 500 items in 125.23653721809387 seconds
Successfully inserted 500 documents into Cosmos DB.
Batch 588 completed. Processed 500 items in 152.02704572677612 seconds
Successfully inserted 500 documents into Cosmos DB.
Batch 589 completed. Processed 500 items in 175.4512243270874 seconds
Successfully inserted 500 documents into Cosmos DB.
Batch 590 completed. Proces

ServiceResponseError: Timeout on reading data from socket

In [15]:
#logging.getLogger("azure.cosmos").setLevel(logging.CRITICAL)
#logging.getLogger("openai").setLevel(logging.CRITICAL)
#logging.getLogger("httpcore").setLevel(logging.CRITICAL)
# Query to get the count of items
query = "SELECT VALUE COUNT(1) FROM c"

# Execute the query
result = list(container.query_items(query=query, enable_cross_partition_query=True))

# The result will be a list with a single value (the count)
for r in result:
    print(r)

1461300
