# Build a Generative AI application using Elasticsearch and OpenAI

This notebook demonstrates how to:
- Index the OpenAI Wikipedia vector dataset into Elasticsearch
- Build a simple Gen AI application with Streamlit that retrieves context using Elasticsearch and formulate answers using OpenAI

# Setup

## Install and import required python libraries
Elastic uses the eland python library to download modesl from Hugging Face hub and load them into elasticsearch

In [4]:
!pip install -qU openai pandas==1.5.3 wget elasticsearch streamlit tqdm

In [11]:
!pip install openai



In [21]:
import os
from getpass import getpass
from elasticsearch import Elasticsearch, helpers
import wget, zipfile, pandas as pd, json, openai
import streamlit as st
from tqdm.notebook import tqdm
import getpass
import openai
from elasticsearch import client

  from elasticsearch import client


## Configure elasticsearch authentication.
The recommended authentication approach is using the Elastic Cloud ID and a cluster level API key

You can use any method you wish to set the required credentials. We are using getpass in this example to prompt for credentials to avoide storing them in github.

In [8]:
es_cloud_id = getpass.getpass('Enter Elastic Cloud ID:  ')
es_user = getpass.getpass('Enter cluster username:  ')
es_pass = getpass.getpass('Enter cluster password:  ')

Enter Elastic Cloud ID:  ········
Enter cluster username:  ········
Enter cluster password:  ········


## Connect to Elastic Cloud

In [9]:
es = Elasticsearch(cloud_id=es_cloud_id,
                   basic_auth=(es_user, es_pass)
                   )
es.info() # should return cluster info


ObjectApiResponse({'name': 'instance-0000000001', 'cluster_name': 'db3b3840a622415584800a4714e5d6f5', 'cluster_uuid': 'ea9VrQVQQ-q40pjLGRM_rQ', 'version': {'number': '8.10.3', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': 'c63272efed16b5a1c25f3ce500715b7fddf9a9fb', 'build_date': '2023-10-05T10:15:55.152563867Z', 'build_snapshot': False, 'lucene_version': '9.7.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

# Configure OpenAI connection

In [15]:
#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
openai.api_key =  getpass.getpass("OpenAI Api Key")
openai.Model.retrieve("text-embedding-ada-002")

OpenAI Api Key········


<Model model id=text-embedding-ada-002 at 0x7f972ddc2040> JSON: {
  "id": "text-embedding-ada-002",
  "object": "model",
  "created": 1671217299,
  "owned_by": "openai-internal",
  "permission": [
    {
      "id": "modelperm-iHRhxPwYPqj6AJE3PFzga9rD",
      "object": "model_permission",
      "created": 1695144609,
      "allow_create_engine": false,
      "allow_sampling": true,
      "allow_logprobs": true,
      "allow_search_indices": true,
      "allow_view": true,
      "allow_fine_tuning": false,
      "organization": "*",
      "group": null,
      "is_blocking": false
    }
  ],
  "root": "text-embedding-ada-002",
  "parent": null
}

# Download the dataset

In [16]:
embeddings_url = 'https://cdn.openai.com/API/examples/data/vector_database_wikipedia_articles_embedded.zip'
wget.download(embeddings_url)

with zipfile.ZipFile("vector_database_wikipedia_articles_embedded.zip",
"r") as zip_ref:
    zip_ref.extractall("data")

##  Read CSV file into a Pandas DataFrame

Next we use the Pandas library to read the unzipped CSV file into a DataFrame. This step makes it easier to index the data into Elasticsearch in bulk.

In [17]:
wikipedia_dataframe = pd.read_csv("data/vector_database_wikipedia_articles_embedded.csv")

In [18]:
wikipedia_dataframe.head()

Unnamed: 0,id,url,title,text,title_vector,content_vector,vector_id
0,1,https://simple.wikipedia.org/wiki/April,April,April is the fourth month of the year in the J...,"[0.001009464613161981, -0.020700545981526375, ...","[-0.011253940872848034, -0.013491976074874401,...",0
1,2,https://simple.wikipedia.org/wiki/August,August,August (Aug.) is the eighth month of the year ...,"[0.0009286514250561595, 0.000820168002974242, ...","[0.0003609954728744924, 0.007262262050062418, ...",1
2,6,https://simple.wikipedia.org/wiki/Art,Art,Art is a creative activity that expresses imag...,"[0.003393713850528002, 0.0061537534929811954, ...","[-0.004959689453244209, 0.015772193670272827, ...",2
3,8,https://simple.wikipedia.org/wiki/A,A,A or a is the first letter of the English alph...,"[0.0153952119871974, -0.013759135268628597, 0....","[0.024894846603274345, -0.022186409682035446, ...",3
4,9,https://simple.wikipedia.org/wiki/Air,Air,Air refers to the Earth's atmosphere. Air is a...,"[0.02224554680287838, -0.02044147066771984, -0...","[0.021524671465158463, 0.018522677943110466, -...",4


In [19]:
wikipedia_dataframe.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 25000 entries, 0 to 24999
Data columns (total 7 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   id              25000 non-null  int64 
 1   url             25000 non-null  object
 2   title           25000 non-null  object
 3   text            25000 non-null  object
 4   title_vector    25000 non-null  object
 5   content_vector  25000 non-null  object
 6   vector_id       25000 non-null  int64 
dtypes: int64(2), object(5)
memory usage: 1.3+ MB


## Create index with mapping

Now we need to create an Elasticsearch index with the necessary mappings. This will enable us to index the data into Elasticsearch.

We use the `dense_vector` field type for the `title_vector` and  `content_vector` fields. This is a special field type that allows us to store dense vectors in Elasticsearch.

Later, we'll need to target the `dense_vector` field for kNN search.

In [23]:
index_mapping= {
    "properties": {
      "title_vector": {
          "type": "dense_vector",
          "dims": 1536,
          "index": "true",
          "similarity": "cosine"
      },
      "content_vector": {
          "type": "dense_vector",
          "dims": 1536,
          "index": "true",
          "similarity": "cosine"
      },
      "text": {"type": "text"},
      "title": {"type": "text"},
      "url": { "type": "keyword"},
      "vector_id": {"type": "long"}

    }
}
es.indices.create(index="wikipedia_vector_index", mappings=index_mapping)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'wikipedia_vector_index'})

## Index data into Elasticsearch

The following function generates the required bulk actions that can be passed to Elasticsearch's Bulk API, so we can index multiple documents efficiently in a single request.

For each row in the DataFrame, the function yields a dictionary representing a single document to be indexed.

In [24]:
def dataframe_to_bulk_actions(df):
    for index, row in df.iterrows():
        yield {
            "_index": 'wikipedia_vector_index',
            "_id": row['id'],
            "_source": {
                'url' : row["url"],
                'title' : row["title"],
                'text' : row["text"],
                'title_vector' : json.loads(row["title_vector"]),
                'content_vector' : json.loads(row["content_vector"]),
                'vector_id' : row["vector_id"]
            }
        }

As the dataframe is large, we will index data in batches of `100`. We index the data into Elasticsearch using the Python client's [helpers](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/client-helpers.html#bulk-helpers) for the bulk API.

In [26]:
total_documents = len(wikipedia_dataframe)

progress_bar = tqdm(total=total_documents, unit="documents")
success_count = 0

for ok, info in helpers.streaming_bulk(es, actions=dataframe_to_bulk_actions(wikipedia_dataframe), raise_on_error=False, chunk_size=100):
  if ok:
    success_count += 1
  else:
    print(f"Unable to index {info['index']['_id']}: {info['index']['error']}")
  progress_bar.update(1)
  progress_bar.set_postfix(success=success_count)

  0%|          | 0/25000 [00:00<?, ?documents/s]

## Build application with Streamlit

In the following section, you will build a simple interface using streamlit.

This application will display a simple search bar where an user can ask a question. Elasticsearch is used to retrieve the relevant documents (context) matching the question then OpenAI formulate an answer using the context.

In [31]:
%%writefile app.py

import os
import streamlit as st
import openai
from elasticsearch import Elasticsearch


# Elastic Cloud
#es_cloud_id = os.environ['es_cloud_id']
#es_password = os.environ['es_password']
es_cloud_id = 'My_deployment:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbzo0NDMkZGIzYjM4NDBhNjIyNDE1NTg0ODAwYTQ3MTRlNWQ2ZjUkNzlkMTdjNDkwOGZmNGVhYjhmM2Q2NjBjNjUwNDhkNDM='
es_password = 'aLN4ahCZrYnia9yzHrMGGy01'


# OpenAI
#openai.api_key = os.environ['openai_api_key']
openai.api_key = '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'

# Define model
EMBEDDING_MODEL = "text-embedding-ada-002"

# Connect to Elasticsearch
client = Elasticsearch(
  cloud_id = es_cloud_id,
  basic_auth=("elastic", es_password) # Alternatively use `api_key` instead of `basic_auth`
)

def openai_summarize(query, response):
    context = response['hits']['hits'][0]['_source']['text']
    summary = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "Answer the following question:" + query + "by using the following text: " + context},
        ]
    )
    return summary.choices[0].message.content


def search_es(query):
    # Create embedding
    question_embedding = openai.Embedding.create(input=query, model=EMBEDDING_MODEL)

    # Define Elasticsearch query
    response = client.search(
    index = "wikipedia_vector_index",
    knn={
        "field": "content_vector",
        "query_vector":  question_embedding["data"][0]["embedding"],
        "k": 10,
        "num_candidates": 100
        }
    )
    return response


def main():
    st.title("Gen AI Application")

    # Input for user search query
    user_query = st.text_input("Enter your question:")

    if st.button("Search"):
        if user_query:

            st.write(f"Searching for: {user_query}")
            result = search_es(user_query)

            # print(result)
            openai_summary = openai_summarize(user_query, result)
            st.write(f"OpenAI Summary: {openai_summary}")

            # Display search results
            if result['hits']['total']['value'] > 0:
                st.write("Search Results:")
                for hit in result['hits']['hits']:
                    st.write(hit['_source']['title'])
                    st.write(hit['_source']['text'])
            else:
                st.write("No results found.")

if __name__ == "__main__":
    main()


Writing app.py


In [None]:
!streamlit run app.py 

[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://192.168.86.58:8501[0m
[0m
[34m[1m  For better performance, install the Watchdog module:[0m

  $ xcode-select --install
  $ pip install watchdog
            [0m
