# Microsoft Azure OpenAI On Your Data with Elasticsearch

In this notebook we'll use Elasticsearch indices to provide grounding data for queries to Azure OpenAI models using the Azure OpenAI On Your Data service.

The Azure OpenAI On Your Data service currently supports three search scenarios for retrieval of documents that will be sent to the LLM for processing:

1) full text search
2) vector search using Elasticsearch Machine Learning models
3) vector search using embeddings generated using Azure OpenAI (Ada).

Each of these examples will be covered in the following sections.

## Requirements

For this example, you will need:
* Python 3.6 or later
* An Elastic deployment meeting the following criteria: with machine learning node
    * API version 8.x
    * A machine learning node for following the example for vector search using an Elasticsearch text embedding model
* An Azure OpenAI Resource
    * At minimum, one chat model should be deployed for your resource to enable chatting about your data.
    * Optionally, if you would like to try out vector search using the Azure OpenAI Ada model, you will also need to deploy an Ada model to your resource.  The examples below will assume you are using the model `text-embedding-ada-002`, but can be updated to suit your needs.
* The [Elastic Python client](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/installation.html)
* The [OpenAI Python Client](https://platform.openai.com/docs/api-reference/introduction?lang=python)

### Create Elastic Deployment

If you don't have an Elastic deployment, you can read more about how to get started here in the official ["Getting Started" guide for Elastic Cloud](https://www.elastic.co/getting-started).


### Configure Azure OpenAI Resource

If you don't have an Azure OpenAI resource, detailed information about how to obtain one can be found in the [official documentation](https://learn.microsoft.com/en-us/azure/ai-services/openai/use-your-data-quickstart?tabs=command-line&pivots=programming-language-python) for the Azure OpenAI On Your Data service.

## Install packages and initialize environment

The first step is to use `pip` to install all of the packages we need to connect to Elasticsearch and Azure OpenAI services.

In [105]:
!pip install -qU elasticsearch openai==0.28.1 requests

Here we will also define a few helper classes and functions that will be reused throughout the notebook.

In [1]:
import math
import numpy as np
import pandas as pd
import statistics
from io import StringIO
from time import sleep, perf_counter
from urllib.request import urlopen


class SampleCsvDatasets(object):
    datasets = {
        "msmarco": {
            "url": "https://github.com/mayya-sharipova/msmarco/raw/main/msmarco-passagetest2019-unique.tsv",
            "fieldnames": ["id", "text"],
            "delimiter": "\t"
        }
    }
    def __init__(self):
        self._values = {}
        for name, config in self.datasets.items():
            url = config.get("url")
            response = urlopen(url)
            self._values[name] = self._create_dataframe(name, response.read().decode("utf-8"))
    
    def _create_dataframe(self, dataset_name, value):
        return pd.read_csv(
            StringIO(value),
            delimiter=self.datasets.get(dataset_name).get("delimiter"),
            names=self.datasets.get(dataset_name).get("fieldnames")
        )
    
    def get_dataframe(self, dataset_name, add_aoai_embeddings=False):
        return self._values[name]
    
    def get_dataframe_with_aoai_embeddings(
        self,
        dataset_name,
        embedding_model_deployment_name,
        text_column_name="text",
        embeddings_column_name="text_embedding.aoai_predicted_value",
        model_name="text-embedding-ada-002"
    ):
        df = self._values[dataset_name]
        
        # 16 is the current maximum batch size for embeddings, so we'll partition the dataframe by 16 rows.
        maximum_batch_size = 16
        
        # This value can be manipulated if rate limiting errors occur while producing embeddings.
        throttling_interval = 0.5
        
        # Occasionally the server will become overloaded, so retrying can ensure success.
        retry_count = 30
        
        number_of_buckets = math.ceil(len(df) / maximum_batch_size)
        partitions = np.array_split(df, number_of_buckets)
        embeddings = []
        request_times = []
        
        start = perf_counter()
        for partition in partitions:
            success = False
            for i in range(retry_count):
                try:
                    request_start = perf_counter()
                    text_embedding_response = openai.Embedding.create(
                        deployment_id=embedding_model_deployment_name,
                        input=partition[text_column_name].tolist(),
                        api_version="2023-03-15-preview"
                    )
                    embeddings.extend([result["embedding"] for result in text_embedding_response["data"]])
                    sleep(throttling_interval)
                    request_end = perf_counter()
                    request_times.append(request_end-request_start)
                    success = True

                except Exception as e:
                    print(f"Encountered the following exception when creating embeddings (retrying): {str(e)}")
                    sleep(5)
            
            if not success:
                raise Exception("Unable to generate embeddings (see previous error output for more information.)")
        
        end = perf_counter()
        df[text_embeddings_column_name] == embeddings
        print(f"Average request time for {len(partition)} records: {statistics.mean(request_times)}")
        print(f"Total time: {end-start}")

def create_index(
    elastic_client,
    index_name: str,
    **properties
):
    mappings = {
        "properties": properties
    }

    # Clean up any previously created index with the same name
    elastic_client.indices.delete(index=index_name, ignore_unavailable=True)

    # Create the index
    elastic_client.indices.create(index=index_name, mappings=mappings)


def index_data(
    elastic_client,
    dataset_reader,
    index_name
):
    # Use the bulk API to index data in chunks of 10000 documents
    operations_chunks = []
    chunk = []
    max_operations_chunk_size = 10000
    chunk_size = 0

    for row in dataset_reader:

        if chunk_size < max_operations_chunk_size:
            chunk.append({"index": {"_index": index_name}})
            chunk.append(row)
            chunk_size += 1
            
        else:
            operations_chunks.append(chunk)
            chunk = []
            chunk_size = 0

    if len(chunk) > 0:
        operations_chunks.append(chunk)

    for chunk in operations_chunks:
        elastic_client.bulk(index=index_name, operations=chunk, refresh=True)

def chat_with_my_data(
    chat_query,
    aoai_deployment_name,
    elasticsearch_endpoint,
    elasticsearch_api_key,
    index_name,
    **embedding_config
):
    elasticsearch_embedding_model = embedding_config.get("elasticsearch_embedding_model", None)
    aoai_embedding_model = embedding_config.get("aoai_embedding_model", None)
    aoai_embedding_key = embedding_config.get("aoai_embedding_key", None)
    aoai_embedding_endpoint = None if not aoai_embedding_model and not aoai_embedding_key \
        else  f"{openapi.api_base}/openai/deployments/{aoai_embedding_model}/embeddings?api-version=2023-03-15-preview"
    
    query_type = "vector" if (aoai_embedding_endpoint and aoai_embedding_key) or elasticsearch_embedding_model else "simple"
    
    completion = openai.ChatCompletion.create(
        messages=[
            {
                "role": "user",
                "content": chat_query
            }
        ],
        dataSources=[
            {
                "type": "Elasticsearch",
                "parameters": {
                    "endpoint": elasticsearch_endpoint,
                    "encodedApiKey": elasticsearch_api_key,
                    "indexName": index_name,
                    "queryType": query_type,
                    "embeddingModelId": elasticsearch_embedding_model,
                    "embeddingEndpoint":  aoai_embedding_endpoint,
                    "embeddingKey": aoai_embedding_key
                }
            }
        ],
        deployment_id=aoai_deployment_name
    )
    print(completion.choices[0].message.content)

Next, let's set some variables for configuring access to the services.

In [80]:
import getpass
import openai
from elasticsearch import Elasticsearch

# Elasticsearch Configuration
elasticsearch_endpoint = input("Elasticsearch endpoint: ")
elasticsearch_api_key = getpass.getpass("Elasticsearch API Key: ")

# Azure OpenAI Configuration
openai.api_base = input("Azure OpenAI resource endpoint: ")
openai.api_key = getpass.getpass("Azure OpenAI resource key: ")
chat_model_deployment_name = input("Azure OpenAI chat model deployment name: ")
embedding_model_deployment_name = input("Azure OpenAI embedding model deployment name (enter 'None' if not applicable): " )


We'll also initialize our dataset manager for easy data access.

In [143]:
datasets = SampleCsvDatasets()

Next, we'll configure the Elasticsearch client, which we will use to index some data.

In [28]:
elastic_client = Elasticsearch(
    f"{elasticsearch_endpoint}:443",
    api_key=elasticsearch_api_key
)

print(elastic_client.info())

{'name': 'instance-0000000001', 'cluster_name': '211e3ff0aa8e41ab8d8fc9bde647e87d', 'cluster_uuid': 'ySrbugeMRISJ2YQQ_ZR8gA', 'version': {'number': '8.10.1', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': 'a94744f97522b2b7ee8b5dc13be7ee11082b8d6b', 'build_date': '2023-09-14T20:16:27.027355296Z', 'build_snapshot': False, 'lucene_version': '9.7.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


Next, we'll configure the OpenAI client to make requests to our specific Azure OpenAI resource endpoint.  We need to create an instance of `requests.adapters.HTTPAdapter` to ensure that the request session URL used by the OpenAI client points to the correct Azure OpenAI resource URL for each request.

In [29]:
from requests import Session
from requests.adapters import HTTPAdapter

class OnYourDataAdapter(HTTPAdapter):
    def send(self, request, **kwargs):
        request.url = f"{openai.api_base}/openai/deployments/{chat_model_deployment_name}/extensions/chat/completions?api-version={openai.api_version}"
        return super().send(request, **kwargs)
    

session = Session()
session.mount(
    prefix=f"{openai.api_base}/openai/deployments/{chat_model_deployment_name}",
    adapter=OnYourDataAdapter()
)

openai.requestssession = session

## Example 1: Grounding ChatGPT with data retrieved from a full-text search query 


### Create Elasticsearch index with required mappings

We need to create an index which contains some text data for using full-text search for retrieving documents to ground the responses from Azure OpenAI.  First, we'll create the index mapping.

In [63]:
elasticsearch_index_name = "msmarco-passagetest2019-unique"
mapping_properties = {
    "id": {"type": "keyword"},
    "text": {"type": "text"}
}

create_index(
    elastic_client,
    elasticsearch_index_name,
    **mapping_properties
)

#### Download dataset

For this example, we'll a subset of the MS MARCO Passage Ranking dataset.  We'll use the dataset manager to get a dataframe of our data to use for indexing.

In [64]:
dataframe = datasets.get_dataframe("msmarco")

#### Index documents

We'll call the helper function `index_data` to handle the indexing.  This function uses the `bulk` API to index data in batches.

In [65]:
index_data(
    elastic_client,
    dataframe,
    elasticsearch_index_name
)

### Chat about your dataset

Now that we have some data available in our Elasticsearch cluster, we can use Azure OpenAI On Your Data to ask questions about it.  The `chat_with_my_data` function call below will use full-text search by default, since we are not passing in any additional configuration about embeddings.

In [66]:
chat_query = "How's the weather in Jamaica?"
chat_with_my_data(
    chat_query,
    chat_model_deployment_name,
    elasticsearch_endpoint,
    elasticsearch_api_key,
    elasticsearch_index_name
)

Based on the retrieved documents, Jamaica has perfect weather most of the year, and the best time to visit Jamaica depends on personal preference. The weather in December is described as cool and it is considered high season[doc1][doc2][doc3][doc5]. Additionally, January and February are also considered high season[doc3][doc5]. For more detailed information on monthly weather averages, including temperature, rainfall, and sunshine, the Jamaica climate guides are recommended for holiday planning[doc4].


## Example 2: Grounding ChatGPT with data retrieved from a kNN search query using an Elasticsearch machine learning model for embeddings

The next example will show how to chat with your data using a vector search query using embeddings produced by a model deployed to your Elasticsearch cluster.  This example is adapted from the Elastic Search Labs guide ["How to deploy NLP: Text Embeddings and Vector Search"](https://www.elastic.co/search-labs/how-to-deploy-nlp-text-embeddings-and-vector-search), and uses the same dataset that we already downloaded in the previous example.

First, we will need to install the `eland` package, which we will use to deploy an embedding model.

In [44]:
!pip install eland[pytorch]

Collecting eland[pytorch]
  Downloading eland-8.10.1-py3-none-any.whl.metadata (13 kB)
Collecting pandas<2,>=1.5 (from eland[pytorch])
  Downloading pandas-1.5.3-cp310-cp310-win_amd64.whl (10.4 MB)
     ---------------------------------------- 0.0/10.4 MB ? eta -:--:--
     ---------------------------------------- 0.0/10.4 MB ? eta -:--:--
     --------------------------------------- 0.0/10.4 MB 653.6 kB/s eta 0:00:16
      --------------------------------------- 0.2/10.4 MB 1.7 MB/s eta 0:00:07
     -- ------------------------------------- 0.6/10.4 MB 4.1 MB/s eta 0:00:03
     ------ --------------------------------- 1.7/10.4 MB 8.9 MB/s eta 0:00:01
     ---------------- ----------------------- 4.3/10.4 MB 18.3 MB/s eta 0:00:01
     ------------------------ --------------- 6.4/10.4 MB 22.7 MB/s eta 0:00:01
     --------------------------- ------------ 7.2/10.4 MB 23.0 MB/s eta 0:00:01
     ------------------------------- -------- 8.2/10.4 MB 21.8 MB/s eta 0:00:01
     ----------------

Next, we will deploy the model `msmarco-MiniLM-L-12-v3` model from Hugging Face, which is trained on a superset of the data we have already indexed in the previous example.

In [47]:
!eland_import_hub_model \
    --url {elasticsearch_endpoint}:443 \
    --es-api-key {elasticsearch_api_key} \
    --hub-model-id sentence-transformers/msmarco-MiniLM-L-12-v3 \
    --task-type text_embedding \
    --start

2023-11-06 18:37:35,452 INFO : Establishing connection to Elasticsearch
2023-11-06 18:37:36,151 INFO : Connected to cluster named '211e3ff0aa8e41ab8d8fc9bde647e87d' (version: 8.10.1)
2023-11-06 18:37:36,154 INFO : Loading HuggingFace transformer tokenizer and model 'sentence-transformers/msmarco-MiniLM-L-12-v3'

(…)12-v3/resolve/main/tokenizer_config.json:   0%|          | 0.00/432 [00:00<?, ?B/s]
(…)12-v3/resolve/main/tokenizer_config.json: 100%|##########| 432/432 [00:00<00:00, 419kB/s]
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to see activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development

(…)-MiniLM-L-12-v3/resolve/main/config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]
(…)-MiniLM-L-12-v3/resolve/main/config.json: 100%|##########| 629/629 [00:00<?, ?B/s] 

(…)co-MiniLM-L-12-v3/resolve/main/vocab.txt:   0%|          | 0.0

Next, we can reindex our data with the embeddings from the model to enable kNN search using those embeddings.  In the next cell, we'll create an ingest pipeline to calculate embeddings from the data.

In [67]:
from elasticsearch.client import IngestClient

pipeline_id = "msmarco-minilm-l-12-v3"
ingest_processors = [
    {
      "inference": {
        "model_id": "sentence-transformers__msmarco-minilm-l-12-v3",
        "target_field": "text_embedding",
        "field_map": {
          "text": "text_field"
        }
      }
    }
]

on_failure = [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{_index}}}"
      }
    },
    {
      "set": {
        "description": "Set error message",
        "field": "ingest.failure",
        "value": "{{_ingest.on_failure_message}}}"
      }
    }
]

ingest_client = IngestClient(elastic_client)

ingest_client.put_pipeline(
    id=pipeline_id,
    on_failure=on_failure,
    processors=ingest_processors
)

ObjectApiResponse({'acknowledged': True})

Then, we'll reindex the `msmarco-passagetest2019-unique` index using the pipeline we just created.

In [68]:
# Create the new index
elasticsearch_embeddings_index_name = f"msmarco-passagetest2019-unique-{pipeline_id}"
mapping_properties = {
    "text_embedding.predicted_value": {
        "type": "dense_vector",
        "dims": 384,
        "index": True,
        "similarity": "cosine"
      },
      "text": {
        "type": "text"
      }
}

create_index(
    elastic_client,
    elasticsearch_embeddings_index_name,
    **mapping_properties)

# Reindex source data into target index
reindex_source = {
    "index": elasticsearch_index_name
}
reindex_dest = {
    "index": elasticsearch_embeddings_index_name,
    "pipeline": pipeline_id
}

response = elastic_client.reindex(
    source=reindex_source,
    dest=reindex_dest,
    wait_for_completion=False
)
print(response)

{'task': 'v2M-MYd8QDWzbhuNMsO24g:67500998'}


The reindexing operation may take some time, so we can poll for the task status before moving to the next step.

In [69]:
from time import sleep

is_completed = False
while not is_completed:
    sleep(1)
    task = elastic_client.tasks.get(task_id=response.get("task"))
    is_completed = task.get("completed")
    
print("Reindexing is complete!")

Reindexing is complete!


Now that we have our embeddings index ready to go, we can use this new index with embeddings with the Azure OpenAI On Your Data service.  Note that this call to `chat_with_my_data` passes in some configuration for our embeddings.

In [70]:
chat_query = "What's the weather like in Jamaica?"

chat_with_my_data(
    chat_query,
    chat_model_deployment_name,
    elasticsearch_endpoint,
    elasticsearch_api_key,
    elasticsearch_embeddings_index_name,
    elasticsearch_embedding_model="sentence-transformers__msmarco-minilm-l-12-v3"
)

Jamaica has a tropical and humid climate with warm to hot temperatures all year round, averaging between 80 and 90 degrees Fahrenheit. The days are warm and the nights are cooler, with mountain areas being cooler than the lower land throughout the year. Rain usually falls for short periods in the late afternoon, with sunshine the rest of the day[doc1][doc3][doc4][doc5]. The best time to visit Jamaica is year-round due to its consistently warm tropical weather, with peak season running from mid-December to mid-April when crowds swell and prices rise[doc2].


## Example 3: Grounding ChatGPT with data retrieved from a kNN search query using Azure OpenAI embeddings

We also have the option to create embeddings to use for kNN search using Azure OpenAI.  

For this example, in addition to the chat model deployment used in the previous two examples for chatting with your data, you will also need an embedding model deployment added to your Azure OpenAI resource.  We will first use this embedding model to generate embeddings on your data, and then later refer to this deployment in our requests to the Azure OpenAI On Your Data service, which will call the model to generate the embeddings for your query as a part of the chat request.

First, we will create a new index with the correct mappings for embeddings produced using the Azure OpenAI Ada embedding model.

In [None]:
aoai_embeddings_index_name = f"msmarco-passagetest2019-unique-{embedding_model_deployment_name}"
mapping_properties = {
    "id": {"type": "keyword"},
    "text": {"type": "text"},
    "text_embedding.aoai_predicted_value": {
        "type": "dense_vector",
        "dims": 1536,
        "index": True,
        "similarity": "cosine"
    }
}

create_index(
    elastic_client,
    aoai_embeddings_index_name,
    **mapping_properties
)

Next, we will calculate embeddings for our dataset.  Note that this step may take some time.

In [None]:
dataframe = datasets.get_dataframe_with_aoai_embeddings("msmarco", embedding_model_deployment_name)

Then, we will index the data with embeddings we just calculated.

In [None]:
index_data(
    elastic_client,
    dataframe,
    aoai_embeddings_index_name,
    add_aoai_embeddings=True,
    aoai_embedding_model_deployment=embedding_model_deployment_name
)

Now that our index has been created with Azure Open AI embeddings, we can use the new index to chat with our data.

In [None]:
chat_query = "What's the weather like in Jamaica?"
chat_with_my_data(
    chat_query,
    chat_model_deployment_name,
    elasticsearch_endpoint,
    elasticsearch_api_key,
    aoai_embeddings_index_name,
    aoai_embedding_model=embedding_model_deployment_name,
    aoai_embedding_key=openai.api_key
)