### Automatic RAG creation pipeline


In [8]:
from itertools import product
from azure.search.documents.indexes.models import (
    HnswAlgorithmConfiguration,
    ExhaustiveKnnAlgorithmConfiguration,
    VectorSearch,
    VectorSearchProfile,
    AzureOpenAIVectorizer,
    AzureOpenAIVectorizerParameters,
    SearchField,
    SearchFieldDataType,
    SearchIndex,
    IndexProjectionMode,
    SearchIndexerIndexProjection,
    SearchIndexerIndexProjectionSelector,
    SplitSkill,
    InputFieldMappingEntry,
    OutputFieldMappingEntry,
    CognitiveServicesAccountKey,
    SearchIndexerSkillset,
    SearchIndexer,
    SearchIndexerDataContainer,
    SearchIndexerDataSourceConnection,
    AzureOpenAIEmbeddingSkill,
    SearchIndexerIndexProjectionsParameters
)
from azure.search.documents.indexes import SearchIndexerClient
from azure.search.documents.indexes import SearchIndexClient
from azureml.core import Workspace
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
import json

#Load env variables
with open("config.json", "r") as f:
    config = json.load(f)

#Connect to ML workspace and MLClient
subscription_id = config["AZML_SUBSCRIPTION_ID"]
resource_group = config["AZML_RESOURCE_GROUP"]
workspace_name = config["AZML_WORKSPACE_NAME"]

credential = DefaultAzureCredential()

ml_client = MLClient(
    DefaultAzureCredential(),
    subscription_id,
    resource_group,
    workspace_name,
)

#Set variables
endpoint_name = config["AZML_ENDPOINT_NAME"]
endpoint = ml_client.online_endpoints.get(endpoint_name)
credentials = ml_client.online_endpoints.get_keys(endpoint_name)

cognitive_services_key = config["COGNITIVE_SERVICES_KEY"]

index_client = SearchIndexClient(endpoint=config["AI_SEARCH_SERVICE_ENDPOINT"], credential=credential)
indexer_client = SearchIndexerClient(
    endpoint=config["AI_SEARCH_SERVICE_ENDPOINT"],
    credential=credential 
)

data_source_name = config["DATA_SOURCE_NAME"]
container_name = config["CONTAINER_NAME"]
connection_string = config["STORAGE_CONNECTION_STRING"]

custom_embedder_endpoint = config["AZML_CUSTOM_MODEL_ENDPOINT"]

'''
Set up a list of search algorithms and vectorizer techniques to use.
A separate RAG model will be created for each possible combination of techniques.
'''

'''
Specify openai service url and model names to use as embedder
'''
azure_openai_resource_url = config["AZURE_OPENAI_RESOURCE_URL"]
openai_embedder_deployment_names = ["text-embedding-3-large", "text-embedding-ada-002-2"]
openai_embedder_model_names = ["text-embedding-3-large", "text-embedding-ada-002"]
openai_embedding_dimensions = [1024, 1536]

'''
Specify online endpoints to use custom embedder
'''
endpoint_urls = [custom_embedder_endpoint]
custom_embedding_dimensions = [384]

'''
Choose search algorithm(s)
Azure currently supports: HNSW and ExhaustiveKNN.
'''
vector_search_algorithms = ["HNSW", "ExhaustiveKNN"]

#Dynamically build vectorizers based on input lists of embedding models
vectorizers = []
if openai_embedder_deployment_names and openai_embedder_model_names and openai_embedding_dimensions:
    for deployment_name, model_name, dimension in zip(openai_embedder_deployment_names, 
                                                    openai_embedder_model_names, 
                                                    openai_embedding_dimensions):
        print(model_name)
        vectorizers.append({
            "name": f"openai-vectorizer-{model_name}".lower(),
            "kind": "azureOpenAI",
            "vectorizer_params": {
                "parameters": AzureOpenAIVectorizerParameters(
                    resource_url=azure_openai_resource_url,
                    deployment_name=deployment_name,
                    model_name=model_name,
                ),
            },
            #Include deployment and model name explicitly to match the correct skillset later on
            "deployment_name": deployment_name,
            "model_name": model_name,
            "dimensions": dimension,
        })

if endpoint_urls and custom_embedding_dimensions:
    for endpoint_url, dimension in zip(endpoint_urls, custom_embedding_dimensions):
        vectorizers.append({
            "name": f"custom-vectorizer-{dimension}",
            "kind": "customWebApi",
            "vectorizer_params": {
                "customWebApiParameters": {
                    "uri": endpoint_url,
                    "httpMethod": "POST",
                    "httpHeaders": {
                        "Authorization": f"Bearer {credentials.primary_key}",
                    },
                },
            },
            "dimensions": dimension,
        })


#Functions for pipeline execution
def create_vector_search(algorithm_name, vectorizer_name, vectorizer_kind, vectorizer_params, vector_dimensions):
    vectorizer = {
        "name": vectorizer_name,
        "kind": vectorizer_kind,
        **vectorizer_params,
    }
    if algorithm_name == "HNSW":
        algorithm_config = HnswAlgorithmConfiguration(name="myHnsw")
        profile_name = "myHnswProfile"
    elif algorithm_name == "ExhaustiveKNN":
        algorithm_config = ExhaustiveKnnAlgorithmConfiguration(
            name="myKnn",
            parameters={"metric": "euclidean"}
        )
        profile_name = "myExhaustiveKnnProfile"

    vector_search = VectorSearch(
        algorithms=[algorithm_config],
        profiles=[
            VectorSearchProfile(
                name=profile_name,
                algorithm_configuration_name=algorithm_config.name,
                vectorizer_name=vectorizer_name,
            )
        ],
        vectorizers=[vectorizer],
    )
    return vector_search, profile_name


def create_index(index_name, fields, vector_search):
    '''
    Creates the index that contains the embedded documents and enables retrieval
    '''
    index = SearchIndex(name=index_name, fields=fields, vector_search=vector_search)
    return index


def create_data_source_connection(name, connection_string, container_name):
    '''
    Data source connection
    '''
    container = SearchIndexerDataContainer(name=container_name)
    data_source_connection = SearchIndexerDataSourceConnection(
        name=name,
        type="azureblob",
        connection_string=connection_string,
        container=container
    )
    return data_source_connection

def create_skillset(skillset_name, index_projections, cognitive_services_key, vectorizer_kind, vectorizer_context=None, vectorizer_uri=None, vectorizer_headers=None, vector_dimensions=None, deployment_name=None, model_name=None):
    '''
    Skillset creation
    Creats split skill and embedding skill
    Embedding skill is based on the vectorizer configuration as it's necessary that they match
    '''
    #Split skill
    split_skill = SplitSkill(
        description="Split skill to chunk documents",
        text_split_mode="pages",
        context="/document",
        maximum_page_length=2000,
        page_overlap_length=500,
        inputs=[
            InputFieldMappingEntry(name="text", source="/document/content"),
        ],
        outputs=[
            OutputFieldMappingEntry(name="textItems", target_name="pages"),
        ],
    )
    #Embedding skill
    if vectorizer_kind == "customWebApi":
        embedding_skill = {
            "@odata.type": "#Microsoft.Skills.Custom.WebApiSkill",
            "name": "customEmbeddingSkill",
            "description": f"Custom embedder from ML endpoint ({vector_dimensions})",
            "uri": vectorizer_uri,
            "context": vectorizer_context,
            "timeout": "PT30S",
            "batchSize": 1,
            "httpMethod": "POST",
            "httpHeaders": vectorizer_headers,
            "inputs": [
                InputFieldMappingEntry(name="text", source="/document/pages/*"),
            ],
            "outputs": [
                OutputFieldMappingEntry(name="vector", target_name="text_vector"),
            ],
        }
    else:
        embedding_skill = AzureOpenAIEmbeddingSkill(
            description="Skill to generate embeddings via Azure OpenAI",
            context="/document/pages/*",
            resource_url=azure_openai_resource_url,
            deployment_name=deployment_name,
            model_name=model_name,
            dimensions=vector_dimensions,
            inputs=[
                InputFieldMappingEntry(name="text", source="/document/pages/*"),
            ],
            outputs=[
                OutputFieldMappingEntry(name="embedding", target_name="text_vector"),
            ],
        )

    #Create skillset
    skills = [split_skill, embedding_skill]
    skillset = SearchIndexerSkillset(
        name=skillset_name,
        description="RAG skillset with selected techniques",
        skills=skills,
        index_projection=index_projections,
        cognitive_services_account=CognitiveServicesAccountKey(key=cognitive_services_key),
    )
    return skillset


def create_indexer(indexer_name, skillset_name, target_index_name, data_source_name):
    '''
    Creates the indexer that fills the index with with embedded documents using the data source and embedding skill
    '''
    indexer = SearchIndexer(
        name=indexer_name,
        description=f"Indexer using skillset {skillset_name} for index {target_index_name}",
        skillset_name=skillset_name,
        target_index_name=target_index_name,
        data_source_name=data_source_name,
    )
    return indexer


#Main pipeline execution loop
for algorithm, vectorizer in product(vector_search_algorithms, vectorizers):
    vectorizer_name = vectorizer["name"]
    vectorizer_kind = vectorizer["kind"]
    vector_dimensions = vectorizer["dimensions"]

    #Get the same models as the vectorizer to use in the embedding skill
    deployment_name = None
    model_name = None
    if vectorizer_kind == "azureOpenAI":
        deployment_name = vectorizer["deployment_name"]
        model_name = vectorizer["model_name"]

    #Generate names
    index_name = f"idx-{algorithm.lower()}-{vectorizer_name.replace('_', '-')}".strip('-')[:128]
    skillset_name = f"ss-{index_name}"[:128]
    indexer_name = f"idxr-{index_name}"[:128]

    vector_search, vector_search_profile_name = create_vector_search(
        algorithm_name=algorithm,
        vectorizer_name=vectorizer_name,
        vectorizer_kind=vectorizer_kind,
        vectorizer_params=vectorizer.get("vectorizer_params", {}),
        vector_dimensions=vector_dimensions,
    )

    #Define fields using vector_search_profile_name
    fields = [
        SearchField(name="parent_id", type=SearchFieldDataType.String),
        SearchField(name="title", type=SearchFieldDataType.String),
        SearchField(
            name="chunk_id",
            type=SearchFieldDataType.String,
            key=True,
            sortable=True,
            filterable=True,
            facetable=True,
            analyzer_name="keyword",
        ),
        SearchField(name="chunk", type=SearchFieldDataType.String, sortable=False, filterable=False, facetable=False),
        SearchField(
            name="text_vector",
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            vector_search_dimensions=vector_dimensions,
            vector_search_profile_name=vector_search_profile_name,
        ),
    ]

    #Create index
    index = create_index(index_name=index_name, fields=fields, vector_search=vector_search)

    try:
        index_client.create_or_update_index(index)
        print(f"Index '{index_name}' created or updated successfully.")
    except Exception as e:
        print(f"Failed to create index '{index_name}': {e}")

    #Create index projections
    index_projections = SearchIndexerIndexProjection(selectors=[
        SearchIndexerIndexProjectionSelector(
            target_index_name=index_name,
            parent_key_field_name="parent_id",
            source_context="/document/pages/*",
            mappings=[
                InputFieldMappingEntry(name="chunk", source="/document/pages/*"),
                InputFieldMappingEntry(name="text_vector", source="/document/pages/*/text_vector"),
                InputFieldMappingEntry(name="title", source="/document/metadata_storage_name"),
            ],
        )
    ], parameters=SearchIndexerIndexProjectionsParameters(
        projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS
    ))

    #Create skillset
    skillset = create_skillset(
        skillset_name=skillset_name,
        index_projections=index_projections,
        cognitive_services_key=cognitive_services_key,
        vectorizer_kind=vectorizer_kind,
        vectorizer_context="/document/pages/*" if vectorizer_kind == "customWebApi" else None,
        vectorizer_uri=vectorizer["vectorizer_params"]["customWebApiParameters"]["uri"] if vectorizer_kind == "customWebApi" else None,
        vectorizer_headers={"Authorization": f"Bearer {credentials.primary_key}"} if vectorizer_kind == "customWebApi" else None,
        vector_dimensions=vector_dimensions,
        deployment_name=deployment_name,
        model_name=model_name,
    )

    indexer_client.create_or_update_skillset(skillset)
    print(f"Skillset '{skillset_name}' created.")

    #Create data source connection
    data_source_connection = create_data_source_connection(
        name=data_source_name,
        connection_string=connection_string,
        container_name=container_name,
    )
    data_source = indexer_client.create_or_update_data_source_connection(data_source_connection)

    #Create indexer
    indexer = create_indexer(
        indexer_name=indexer_name,
        skillset_name=skillset_name,
        target_index_name=index_name,
        data_source_name=data_source.name,
    )
    indexer_client.create_or_update_indexer(indexer)
    print(f"Indexer '{indexer_name}' created and running. Allow a few minutes for indexing.")

Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed
Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


text-embedding-3-large
text-embedding-ada-002
Index 'idx-hnsw-openai-vectorizer-text-embedding-3-large' created or updated successfully.
Skillset 'ss-idx-hnsw-openai-vectorizer-text-embedding-3-large' created.
Indexer 'idxr-idx-hnsw-openai-vectorizer-text-embedding-3-large' created and running. Allow a few minutes for indexing.
Index 'idx-hnsw-openai-vectorizer-text-embedding-ada-002' created or updated successfully.
Skillset 'ss-idx-hnsw-openai-vectorizer-text-embedding-ada-002' created.
Indexer 'idxr-idx-hnsw-openai-vectorizer-text-embedding-ada-002' created and running. Allow a few minutes for indexing.
Index 'idx-hnsw-custom-vectorizer-384' created or updated successfully.
Skillset 'ss-idx-hnsw-custom-vectorizer-384' created.
Indexer 'idxr-idx-hnsw-custom-vectorizer-384' created and running. Allow a few minutes for indexing.
Index 'idx-exhaustiveknn-openai-vectorizer-text-embedding-3-large' created or updated successfully.
Skillset 'ss-idx-exhaustiveknn-openai-vectorizer-text-embed