In [1]:
%pip install azure-cosmos==4.7.0

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 7, Finished, Available, Finished)

Collecting azure-cosmos==4.7.0
  Downloading azure_cosmos-4.7.0-py3-none-any.whl.metadata (70 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m70.3/70.3 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
Downloading azure_cosmos-4.7.0-py3-none-any.whl (252 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m252.1/252.1 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: azure-cosmos
Successfully installed azure-cosmos-4.7.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.



In [2]:
import asyncio
import base64
import json
import time
import warnings

# -----------------------------
# Third-Party Libraries
# -----------------------------
from pprint import pprint
from types import SimpleNamespace
from typing import Any, Dict, List, Optional


# -----------------------------
# Fabric/Synapse Utilities
# -----------------------------
import notebookutils

# -----------------------------
# Azure SDK
# -----------------------------
from azure.core.credentials import AccessToken, TokenCredential
from azure.cosmos import PartitionKey, ThroughputProperties, exceptions
from azure.cosmos.aio import CosmosClient as CosmosAsyncClient

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 9, Finished, Available, Finished)

In [4]:
config = SimpleNamespace(**{
    "cosmos_endpoint": "https://ad9da7b5-365e-47f1-95c3-767285310f4d.zad.sql.cosmos.fabric.microsoft.com:443/",
    "cosmos_database_name": "VectorCosmosDB",
    "cosmos_container_name": "vectorstorecontainer",
    "cosmos_vector_property_name": "vector",
    "cosmos_cache_database_name": "VectorCosmosDB",
    "cosmos_cache_container_name": "vectorcachecontainer",
    "openai_embeddings_dimensions": 1536
})

vector_embedding_policy = {
    "vectorEmbeddings": [
        {
            "path": "/" + config.cosmos_vector_property_name,
            "dataType": "float32",
            "distanceFunction": "cosine",
            "dimensions": config.openai_embeddings_dimensions,
        },
    ]
}
indexing_policy = {
    "includedPaths": [{"path": "/*"}],
    "excludedPaths": [
        {
            "path": '/"_etag"/?',
            "path": "/" + config.cosmos_vector_property_name + "/*",
        }
    ],
    "vectorIndexes": [{"path": "/" + config.cosmos_vector_property_name, "type": "diskANN"}],
}

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 11, Finished, Available, Finished)

In [5]:
class FabricTokenCredential(TokenCredential):
    def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                  enable_cae: bool = False, **kwargs: Any) -> AccessToken:
        access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
        parts = access_token.split(".")
        if len(parts) < 2:
            raise ValueError("Invalid JWT format")
        payload_b64 = parts[1]
        padding = (-len(payload_b64)) % 4
        if padding:
            payload_b64 += "=" * padding
        payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
        payload = json.loads(payload_json)
        exp = payload.get("exp")
        if exp is None:
            raise ValueError("exp claim missing in token")
        return AccessToken(token=access_token, expires_on=exp) 

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 12, Finished, Available, Finished)

In [6]:
async def get_db_client(cosmos_client,cosmos_database_name):
    db_client = cosmos_client.get_database_client(database=cosmos_database_name)
    db_properties = await db_client.read()
    return db_client, db_properties

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 13, Finished, Available, Finished)

In [7]:
async def get_or_create_container(db_client, container_name, max_throughput=20000, indexing_policy=None, vector_embedding_policy=None):
    try:
        container = await db_client.create_container_if_not_exists(
            id=container_name,
            partition_key=PartitionKey(path="/id"),
            indexing_policy=indexing_policy,
            vector_embedding_policy=vector_embedding_policy,
            offer_throughput=ThroughputProperties(auto_scale_max_throughput=max_throughput)
        )
        print(f"Container with id '{container.id}' created or already exists")
        return container
    except exceptions.CosmosHttpResponseError as e:
        print(f"Error creating container '{container_name}': {e}")
        raise

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 14, Finished, Available, Finished)

In [8]:

async def insert_data(
    data: List[Dict],
    container,
    max_concurrency: int = 2,
    delay_per_item: float = 0.1,
    backoff_on_error: float = 1.0
):
    start_time = time.time()
    counter = 0
    semaphore = asyncio.Semaphore(max_concurrency)
    print("Starting document load, please wait...")

    async def upsert_object(obj: Dict):
        nonlocal counter
        async with semaphore:
            try:
                await container.upsert_item(obj)
            except Exception as e:
                print(f"Error inserting doc: {e}")
                await asyncio.sleep(backoff_on_error)  # Backoff on failure
            counter += 1
            if counter % 50 == 0:
                print(f"Sent {counter} documents for insertion into collection.")
            await asyncio.sleep(delay_per_item)  # Small delay to avoid spiking RUs

    tasks = [asyncio.create_task(upsert_object(obj)) for obj in data]
    await asyncio.gather(*tasks)

    end_time = time.time()
    print(f"All {counter} documents inserted!")
    print(f"Time taken: {end_time - start_time:.2f} seconds")

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 15, Finished, Available, Finished)

In [9]:
cosmos_client = CosmosAsyncClient(
    config.cosmos_endpoint, credential=FabricTokenCredential()
)

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 16, Finished, Available, Finished)

In [10]:
db_client, db_properties = await get_db_client(cosmos_client, config.cosmos_database_name)
pprint(db_properties)

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 17, Finished, Available, Finished)

{'_colls': 'colls/',
 '_etag': '"0000c001-0000-0d00-0000-693201810000"',
 '_rid': 'I7ZLAA==',
 '_self': 'dbs/I7ZLAA==/',
 '_ts': 1764884865,
 '_users': 'users/',
 'id': 'VectorCosmosDB'}


In [11]:
movies_container = await get_or_create_container(
    db_client,
    config.cosmos_container_name,
    max_throughput=20000,
    indexing_policy=indexing_policy,
    vector_embedding_policy=vector_embedding_policy
)

cache_container = await get_or_create_container(
    db_client,
    config.cosmos_cache_container_name,
    max_throughput=2000,
    indexing_policy=indexing_policy,
    vector_embedding_policy=vector_embedding_policy
)

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, 18, Finished, Available, Finished)

Container with id 'vectorstorecontainer' created or already exists
Container with id 'vectorcachecontainer' created or already exists


In [None]:
with open('/lakehouse/default/Files/MovieLens-4489-256D/MovieLens-4489-256D.json', 'r') as d:
    data = json.load(d)

print("Number of Documents in raw collection: ", len(data))
await insert_data(data, container=movies_container)

StatementMeta(, 9f4a06f9-f075-4db0-a1b0-94bb0286d50e, -1, Cancelled, , Cancelled)