In [1]:
%pip install azure-search-documents==11.6.0b8
%pip install azure-identity
%pip install datasets
%pip install tabulate
%pip install python-dotenv
%pip install openai

Collecting azure-search-documents==11.6.0b8
  Downloading azure_search_documents-11.6.0b8-py3-none-any.whl.metadata (22 kB)
Downloading azure_search_documents-11.6.0b8-py3-none-any.whl (335 kB)
Installing collected packages: azure-search-documents
  Attempting uninstall: azure-search-documents
    Found existing installation: azure-search-documents 11.6.0b9
    Uninstalling azure-search-documents-11.6.0b9:
      Successfully uninstalled azure-search-documents-11.6.0b9
Successfully installed azure-search-documents-11.6.0b8
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting datasets
  Downloading datasets-4.0.0-py3-none-any.whl.metadata (19 kB)
Collecting filelock (from datasets)
  Downloading filelock-3.19.1-py3-none-any.whl.metadata (2.1 kB)
Collecting pyarrow>=15.0.0 (from datasets)
  Downloading pyarrow-21.0.0-cp310-cp310-win_amd64.whl.metadata (3.4 kB)
Collecting dill<0.3.9,>=0.3.0 (from data

In [2]:
import os

AZURE_SEARCH_SERVICE_ENDPOINT=os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")
AZURE_SEARCH_ADMIN_KEY=os.getenv("AZURE_SEARCH_ADMIN_KEY")
AZURE_OPENAI_SERVICE_ENDPOINT=os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_EMBED_DEPLOYMENT=os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYED_MODEL_NAME")

In [None]:
import json

from azure.core.exceptions import ResourceExistsError
from azure.core.credentials import AzureKeyCredential
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
#Diferentes tipos de indices
from azure.search.documents.indexes.models import (
    BinaryQuantizationCompression,
    HnswAlgorithmConfiguration,
    HnswParameters,
    ScalarQuantizationCompression,
    ScalarQuantizationParameters,
    SearchField,
    SearchFieldDataType,
    SearchIndex,
    SimpleField,
    VectorSearch,
    VectorSearchAlgorithmKind,
    VectorSearchProfile,
    VectorSearchCompressionRescoreStorageMethod,
    RescoringOptions
)
from azure.search.documents.models import VectorizedQuery
from dotenv import load_dotenv
from tabulate import tabulate

In [4]:
SERVICE_ENDPOINT = AZURE_SEARCH_SERVICE_ENDPOINT
credential = AzureKeyCredential(AZURE_SEARCH_ADMIN_KEY)

In [5]:
index_client = SearchIndexClient(endpoint=SERVICE_ENDPOINT, credential=credential)

index = SearchIndex(
    name="tinyindex",
    fields=[
        SimpleField(name="id", type=SearchFieldDataType.String, key=True),
        SearchField(
            name="embedding",
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            searchable=True,
            vector_search_dimensions=3,
            vector_search_profile_name="embedding_profile",
            stored=False
        ),
    ],
    vector_search=VectorSearch(
        algorithms=[
            HnswAlgorithmConfiguration(
                name="hnsw_config",
                kind=VectorSearchAlgorithmKind.HNSW,
                parameters=HnswParameters(metric="cosine"),
            )
        ],
        profiles=[
            VectorSearchProfile(
                name="embedding_profile", algorithm_configuration_name="hnsw_config", compression_name="binary_compression"
            )
        ],
        compressions=[
            BinaryQuantizationCompression(
                compression_name="binary_compression",
                rerank_with_original_vectors=None,
                default_oversampling=None,
                rescoring_options=RescoringOptions(
                    enable_rescoring=False,
                    rescore_storage_method=VectorSearchCompressionRescoreStorageMethod.DISCARD_ORIGINALS,
                ),
            )
        ],
    ),
)

index_client.create_index(index)

<azure.search.documents.indexes.models._index.SearchIndex at 0x2794edb20e0>

In [6]:
INDEX_PREFIX = "compression-test"

scenarios = [
    {
        "name": "baseline",
        "compression_type": None,
        "truncate_dims": None,
        "discard_originals": False,
        "stored_embedding": True,
        "description": "Baseline configuration without compression"
    },
    {
        "name": "baseline-s",
        "compression_type": None,
        "truncate_dims": None,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Baseline configuration without compression, with stored=False "
    },
    {
        "name": "scalar-full",
        "compression_type": "scalar",
        "truncate_dims": None,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Scalar quantization (int8) with full dimensions, preserved originals"
    },
    {
        "name": "scalar-truncated-1024",
        "compression_type": "scalar",
        "truncate_dims": 1024,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Scalar quantization (int8) with 1024 dimensions, preserved originals"
    },
    {
        "name": "scalar-truncated-1024-discard",
        "compression_type": "scalar",
        "truncate_dims": 1024,
        "discard_originals": True,
        "stored_embedding": False,
        "description": "Scalar quantization (int8) with 1024 dimensions, discarded originals"
    },
    {
        "name": "binary-full",
        "compression_type": "binary",
        "truncate_dims": None,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Binary quantization with full dimensions, preserved originals"
    },
    {
        "name": "binary-truncated-1024",
        "compression_type": "binary",
        "truncate_dims": 1024,
        "discard_originals": False,
        "stored_embedding": False,
        "description": "Binary quantization with 1024 dimensions, preserved originals"
    },
    {
        "name": "binary-truncated-1024-discard",
        "compression_type": "binary",
        "truncate_dims": 1024,
        "discard_originals": True,
        "stored_embedding": False,
        "description": "Binary quantization with 1024 dimensions, discarded originals"
    }
]

In [7]:
class AzureSearchIndexManager:
    def __init__(self, service_endpoint: str, credential: str, index_name_prefix: str, vector_dimensions: int):
        self.client = SearchIndexClient(endpoint=service_endpoint, credential=credential)
        self.index_name_prefix = index_name_prefix
        self.vector_dimensions = vector_dimensions

    def _create_base_fields(self, stored_embedding=True):
        return [
            SimpleField(name="id", type=SearchFieldDataType.String, key=True),
            SearchField(name="title", type=SearchFieldDataType.String, searchable=True),
            SearchField(name="content", type=SearchFieldDataType.String, searchable=True),
            SearchField(
                name="embedding",
                type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                searchable=True,
                vector_search_dimensions=self.vector_dimensions,
                vector_search_profile_name="default-profile",
                stored=stored_embedding,
            ),
        ]

    def _create_compression_config(
        self,
        config_type: str,
        truncate_dims: int = None,
        discard_originals: bool = False,
        oversample_ratio: int = 10,
    ):
        """
        Creates a compression configuration based on the scenario.
        """
        compression_name = f"{config_type}-compression"

        rescore_storage_method = (
            VectorSearchCompressionRescoreStorageMethod.DISCARD_ORIGINALS
            if discard_originals
            else VectorSearchCompressionRescoreStorageMethod.PRESERVE_ORIGINALS
        )

        enable_rescoring = not discard_originals

        rescoring_options = RescoringOptions(
            enable_rescoring=enable_rescoring,
            default_oversampling=oversample_ratio if enable_rescoring else None,
            rescore_storage_method=rescore_storage_method,
        )

        base_params = {
            "compression_name": compression_name,
            "rescoring_options": rescoring_options,
            "rerank_with_original_vectors": None,
            "default_oversampling": None,
        }

        if truncate_dims:
            base_params["truncation_dimension"] = truncate_dims

        if config_type == "scalar":
            compression = ScalarQuantizationCompression(
                parameters=ScalarQuantizationParameters(quantized_data_type="int8"),
                **base_params,
            )
        elif config_type == "binary":
            compression = BinaryQuantizationCompression(
                **base_params,
            )
        else:
            compression = None

        return compression

    def _create_vector_search_config(self, compression_config=None):
        """
        Creates the VectorSearch configuration, including algorithm and compression settings.
        """
        algorithm_config = HnswAlgorithmConfiguration(
            name="hnsw-config",
            kind="hnsw",
            parameters=HnswParameters(
                m=4,
                ef_construction=400,
                ef_search=500,
                metric="cosine"
            )
        )

        profiles = [
            VectorSearchProfile(
                name="default-profile",
                algorithm_configuration_name=algorithm_config.name,
                compression_name=compression_config.compression_name if compression_config else None,
            )
        ]

        vector_search = VectorSearch(
            profiles=profiles,
            algorithms=[algorithm_config],
            compressions=[compression_config] if compression_config else None,
        )

        return vector_search

    def create_index(self, scenario: dict):
        """
        Creates or updates an index based on the provided scenario.
        """
        index_name = f"{self.index_name_prefix}-{scenario['name']}"

        stored_embedding = scenario.get('stored_embedding', True)

        fields = self._create_base_fields(stored_embedding=stored_embedding)

        compression_config = None
        if scenario["compression_type"]:
            compression_config = self._create_compression_config(
                config_type=scenario["compression_type"],
                truncate_dims=scenario.get("truncate_dims"),
                discard_originals=scenario.get("discard_originals", False),
            )

        vector_search = self._create_vector_search_config(compression_config)

        index = SearchIndex(
            name=index_name,
            fields=fields,
            vector_search=vector_search,
        )

        try:
            self.client.create_or_update_index(index)
        except ResourceExistsError:
            print(f"Index {index_name} already exists.")
        except Exception as e:
            if e.message and "already exists" in e.message:
                print(f"Index {index_name} already exists.")
            else:
                print(f"Error creating index {index_name}: {type(e)} - {str(e)}")
    
        return index_name

manager = AzureSearchIndexManager(
    service_endpoint=SERVICE_ENDPOINT,
    credential=credential,
    index_name_prefix=INDEX_PREFIX,
    vector_dimensions=3072)

created_indexes = []
for scenario in scenarios:
    try:
        index_name = manager.create_index(scenario)
        created_indexes.append({
            "index_name": index_name,
            "configuration": scenario["description"]
        })
    except Exception as e:
        print(f"Error creating index for scenario {scenario['name']}: {str(e)}")

if len(created_indexes) > 0:
    print("\nCreated Indexes:")
    tabulate(created_indexes, headers="keys", tablefmt="html")
else:
    print("\nNo indexes were created successfully.")


Created Indexes:


In [8]:
from datasets import load_dataset
import pyarrow.parquet as pq
import pyarrow as pa

print("Loading dataset in streaming mode...")
ds = load_dataset(
    "Qdrant/dbpedia-entities-openai3-text-embedding-3-large-3072-1M",
    streaming=True,
    split='train'
)

ds = ds.select_columns(["_id", "title", "text", "text-embedding-3-large-3072-embedding"])

print("Taking first 10K examples...")
data = []
for i, example in enumerate(ds):
    if i >= 10000:
        break
    data.append(example)

print("Converting to parquet...")
output_file = "dbpedia_10k.parquet"
table = pa.Table.from_pylist(data)
pq.write_table(
    table,
    output_file,
    compression='snappy'
)

print(f"Dataset saved to {output_file}")

table = pq.read_table(output_file)
print(f"\nSaved dataset shape: {table.num_rows} rows × {table.num_columns} columns")
print("Columns:", table.column_names)

  from .autonotebook import tqdm as notebook_tqdm


Loading dataset in streaming mode...


To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


Taking first 10K examples...
Converting to parquet...
Dataset saved to dbpedia_10k.parquet

Saved dataset shape: 10000 rows × 4 columns
Columns: ['_id', 'title', 'text', 'text-embedding-3-large-3072-embedding']


In [9]:
table = pq.read_table("dbpedia_10k.parquet")

first_doc = {
    "id": str(table['_id'][0].as_py()),
    "title": table['title'][0].as_py(),
    "content": table['text'][0].as_py(),
    "embedding": table['text-embedding-3-large-3072-embedding'][0].as_py()
}

print("Dataset Information:")
print(f"Total number of rows: {table.num_rows}")
print(f"Columns: {table.column_names}")
print("\nFirst document structure:")
print(json.dumps(first_doc, indent=2, default=str))

print("\nData Statistics:")
print(f"Embedding dimension: {len(first_doc['embedding'])}")
print(f"Average title length: {sum(len(str(title)) for title in table['title']) / table.num_rows:.1f} characters")
print(f"Average text length: {sum(len(str(text)) for text in table['text']) / table.num_rows:.1f} characters")

print("\nChecking for null values:")
for column in table.column_names:
    null_count = table[column].null_count
    print(f"{column}: {null_count} null values")

Dataset Information:
Total number of rows: 10000
Columns: ['_id', 'title', 'text', 'text-embedding-3-large-3072-embedding']

First document structure:
{
  "id": "<dbpedia:Parabolic_reflector>",
  "title": "Parabolic reflector",
  "content": "A parabolic (or paraboloid or paraboloidal) reflector (or dish or mirror) is a reflective surface used to collect or project energy such as light, sound, or radio waves. Its shape is part of a circular paraboloid, that is, the surface generated by a parabola revolving around its axis. The parabolic reflector transforms an incoming plane wave traveling along the axis into a spherical wave converging toward the focus.",
  "embedding": [
    -0.022388368844985962,
    0.028537113219499588,
    -0.017491688951849937,
    -0.01533466950058937,
    -0.03451230376958847,
    0.0162644200026989,
    -0.020231351256370544,
    -0.012743767350912094,
    -0.032082557678222656,
    0.0037468906957656145,
    -0.003864658996462822,
    0.004292343743145466,
  

In [10]:
import base64
import time
from datetime import datetime
from typing import List, Dict, Any
import json

import pyarrow.parquet as pq


def chunk_list(lst: List[Any], chunk_size: int) -> List[List[Any]]:
    """Split a list into chunks of specified size."""
    return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]


def encode_key(key: str) -> str:
    """Encode key to be Azure Search compatible using URL-safe base64."""
    return base64.urlsafe_b64encode(key.encode()).decode()


def prepare_documents(table) -> List[Dict]:
    """
    Convert Arrow table to list of documents with base64 encoded IDs.
    """
    documents = []
    total_rows = table.num_rows

    print(f"Converting {total_rows} rows to documents...")
    for i in range(total_rows):
        original_id = str(table["_id"][i].as_py())
        encoded_id = encode_key(original_id)

        document = {
            "id": encoded_id,
            "title": table["title"][i].as_py(),
            "content": table["text"][i].as_py(),
            "embedding": table["text-embedding-3-large-3072-embedding"][i].as_py(),
        }
        documents.append(document)

        if i % 1000 == 0:
            print(f"Processed {i}/{total_rows} documents...")

    print("Document conversion complete")

    print("\nSample document format:")
    sample_doc = documents[0].copy()
    print("Original ID:", original_id)
    print("Encoded ID:", sample_doc["id"])
    print(json.dumps(sample_doc, indent=2, default=str))

    return documents


def upload_to_search(
    documents: List[Dict],
    endpoint: str,
    index_name: str,
    credential,
    batch_size: int = 100,
) -> None:
    """
    Upload documents to Azure Search index using manual batching.
    """
    search_client = SearchClient(
        endpoint=endpoint, index_name=index_name, credential=credential
    )

    total_docs = len(documents)
    print(f"\nStarting upload to index: {index_name} at {datetime.now()}")
    print(f"Total documents to upload: {total_docs}")
    start_time = datetime.now()

    batches = chunk_list(documents, batch_size)
    total_batches = len(batches)

    successful_docs = 0
    failed_docs = 0

    try:
        for batch_num, batch in enumerate(batches, 1):
            max_retries = 3
            retry_count = 0

            while retry_count < max_retries:
                try:
                    results = search_client.upload_documents(documents=batch)

                    for result in results:
                        if result.succeeded:
                            successful_docs += 1
                        else:
                            failed_docs += 1
                            print(
                                f"Failed to upload document {result.key}: {result.error}"
                            )

                    elapsed_time = datetime.now() - start_time
                    print(
                        f"Index {index_name}: Processed batch {batch_num}/{total_batches} "
                        f"({successful_docs}/{total_docs} docs). "
                        f"Elapsed time: {elapsed_time}"
                    )

                    time.sleep(0.25)
                    break 

                except Exception as e:
                    retry_count += 1
                    if retry_count == max_retries:
                        print(
                            f"Failed to upload batch after {max_retries} attempts: {str(e)}"
                        )
                        failed_docs += len(batch)
                    else:
                        print(
                            f"Retry {retry_count}/{max_retries} after error: {str(e)}"
                        )
                        time.sleep(2**retry_count) 

        total_time = datetime.now() - start_time
        print(f"\nUpload to {index_name} completed:")
        print(f"Successfully uploaded: {successful_docs} documents")
        print(f"Failed to upload: {failed_docs} documents")
        print(f"Total time: {total_time}")

        result = search_client.search("*", top=0)
        final_count = result.get_count()
        print(f"Final document count in index: {final_count}")

    except Exception as e:
        print(f"Fatal error during upload to {index_name}: {str(e)}")
        raise


def upload_to_all_indexes(
    documents: List[Dict],
    endpoint: str,
    index_prefix: str,
    scenarios: list,
    credential: AzureKeyCredential,
    batch_size: int = 100,
) -> None:
    """
    Upload documents to all indexes sequentially.
    """
    total_start_time = datetime.now()

    for i, scenario in enumerate(scenarios, 1):
        index_name = f"{index_prefix}-{scenario['name']}"
        print(f"\nProcessing index {i} of {len(scenarios)}: {index_name}")
        upload_to_search(
            documents=documents,
            endpoint=endpoint,
            index_name=index_name,
            credential=credential,
            batch_size=batch_size,
        )

    total_time = datetime.now() - total_start_time
    print(f"\nCompleted all uploads. Total time: {total_time}")

print("Loading parquet file...")
table = pq.read_table("dbpedia_10k.parquet")
print(f"Loaded {table.num_rows} rows")

documents = prepare_documents(table)

upload_to_all_indexes(
    documents=documents,
    endpoint=SERVICE_ENDPOINT,
    index_prefix=INDEX_PREFIX,
    scenarios=scenarios,
    credential=credential,
    batch_size=100,
)

Loading parquet file...
Loaded 10000 rows
Converting 10000 rows to documents...
Processed 0/10000 documents...
Processed 1000/10000 documents...
Processed 2000/10000 documents...
Processed 3000/10000 documents...
Processed 4000/10000 documents...
Processed 5000/10000 documents...
Processed 6000/10000 documents...
Processed 7000/10000 documents...
Processed 8000/10000 documents...
Processed 9000/10000 documents...
Document conversion complete

Sample document format:
Original ID: <dbpedia:John_Dowsley_Reid>
Encoded ID: PGRicGVkaWE6UGFyYWJvbGljX3JlZmxlY3Rvcj4=
{
  "id": "PGRicGVkaWE6UGFyYWJvbGljX3JlZmxlY3Rvcj4=",
  "title": "Parabolic reflector",
  "content": "A parabolic (or paraboloid or paraboloidal) reflector (or dish or mirror) is a reflective surface used to collect or project energy such as light, sound, or radio waves. Its shape is part of a circular paraboloid, that is, the surface generated by a parabola revolving around its axis. The parabolic reflector transforms an incoming 

In [11]:
from azure.search.documents.indexes import SearchIndexClient
from tabulate import tabulate 

def bytes_to_mb(bytes):
    """Convert bytes to megabytes with 4 decimal places"""
    return round(bytes / (1024 * 1024), 4)

def get_index_sizes(
    endpoint: str,
    index_prefix: str,
    scenarios: list,
    credential,
    retry_attempts: int = 3
) -> None:
    """
    Get and print storage sizes for all indexes, with retry logic for eventual consistency.
    """
    search_index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
    
    print("\nGathering index statistics...")
    print("Note: There may be delays in finding index statistics after document upload")
    print("Index statistics is not a real-time API\n")
    
    index_data = []
    for scenario in scenarios:
        index_name = f"{index_prefix}-{scenario['name']}"
        
        for attempt in range(retry_attempts):
            try:
                stats = search_index_client.get_index_statistics(index_name)
                storage_size = bytes_to_mb(stats["storage_size"])
                vector_size = bytes_to_mb(stats["vector_index_size"])
                total_size = storage_size + vector_size
                index_data.append({
                    'Index Name': index_name,
                    'Scenario': scenario['name'],
                    'Storage Size (MB)': storage_size,
                    'Vector Size (MB)': vector_size,
                    'Total Size (MB)': total_size,  
                })
                break
            except Exception as e:
                if attempt == retry_attempts - 1:
                    print(f"Failed to get statistics for {index_name} after {retry_attempts} attempts: {str(e)}")
                else:
                    print(f"Retry {attempt + 1}/{retry_attempts} for {index_name}")
                    time.sleep(2 ** attempt)  
    
    baseline_entry = next((entry for entry in index_data if entry['Scenario'] == 'baseline'), None)
    if not baseline_entry:
        print("Baseline scenario not found.")
        return
    baseline_storage_size = baseline_entry['Storage Size (MB)']
    baseline_vector_size = baseline_entry['Vector Size (MB)']
    
    for entry in index_data:
        storage_reduction_pct = ((baseline_storage_size - entry['Storage Size (MB)']) / baseline_storage_size) * 100
        vector_reduction_pct = ((baseline_vector_size - entry['Vector Size (MB)']) / baseline_vector_size) * 100
        entry['Storage Reduction (%)'] = f"{storage_reduction_pct:.2f}"
        entry['Vector Reduction (%)'] = f"{vector_reduction_pct:.2f}"
    
    index_data.sort(key=lambda x: x['Total Size (MB)'], reverse=True)
    
    headers = [
        'Index Name', 'Scenario', 'Storage Size (MB)', 'Storage Reduction (%)',
        'Vector Size (MB)', 'Vector Reduction (%)'
    ]
    table_rows = [
        [
            entry['Index Name'],
            entry['Scenario'],
            f"{entry['Storage Size (MB)']:.4f}",
            entry['Storage Reduction (%)'],
            f"{entry['Vector Size (MB)']:.4f}",
            entry['Vector Reduction (%)']
        ]
        for entry in index_data
    ]
    
    return tabulate(table_rows, headers=headers, tablefmt="html")
    

get_index_sizes(
    endpoint=SERVICE_ENDPOINT,
    index_prefix=INDEX_PREFIX,
    scenarios=scenarios,
    credential=credential
)


Gathering index statistics...
Note: There may be delays in finding index statistics after document upload
Index statistics is not a real-time API



Index Name,Scenario,Storage Size (MB),Storage Reduction (%),Vector Size (MB),Vector Reduction (%)
compression-test-baseline,baseline,382.411,0.0,117.721,0.0
compression-test-baseline-s,baseline-s,132.047,65.47,117.72,0.0
compression-test-scalar-full,scalar-full,163.507,57.24,32.043,72.78
compression-test-scalar-truncated-1024,scalar-truncated-1024,142.266,62.8,10.9074,90.73
compression-test-binary-full,binary-full,135.693,64.52,4.194,96.44
compression-test-binary-truncated-1024,binary-truncated-1024,133.1,65.19,1.7501,98.51
compression-test-scalar-truncated-1024-discard,scalar-truncated-1024-discard,54.001,85.88,39.5958,66.36
compression-test-binary-truncated-1024-discard,binary-truncated-1024-discard,12.723,96.67,1.3823,98.83
