"Notebook magic" commands to install packages that we will need.

In [None]:
!pip install dotenv

Some imports and environment variables we will make use of to connect to Elasticsearch and OpenAI's LLM.

In [None]:
import requests
from dotenv import load_dotenv
import os

# Export the API key to an environment variable
if not os.path.exists('.env.instruqt'):
    env_text = requests.get('http://kubernetes-vm:9000/env').text
    with open('.env.instruqt', 'w') as f:
        f.write(env_text)
load_dotenv('.env.instruqt')

openai_api_key =  os.environ.get("LLM_APIKEY") 
url = os.environ.get("LLM_PROXY_URL") 
openai_api_base = f"https://{url}"

os.environ["OPENAI_API_KEY"] = openai_api_key
os.environ["OPENAI_BASE_URL"] = openai_api_base

es_host = os.getenv("ELASTICSEARCH_URL", None)
es_api_key = os.getenv("ELASTICSEARCH_APIKEY", None)

In [None]:
if openai_api_key is None:
    raise ValueError("The openai_api_key environment variable is not set.")

In [None]:
#Little utility function for pretty printing JSON
def jsn(x):
    import json
    x=dict(x)
    print(json.dumps(x, indent=2, sort_keys=True))

In [None]:
#Helps to suppress spurious warnings
import warnings
warnings.filterwarnings('ignore')

Import the Elasticsearch module for python

In [None]:
from elasticsearch import Elasticsearch

Connect to Elasticsearch and verify

In [None]:
es = Elasticsearch(
     hosts=[f"{es_host}"],
     api_key=es_api_key,
)
jsn(es.info())

<br>

# Run searches on Elasticsearch #

In [None]:
#function that runs a simple match query
def retrieve_documents(query, top_n=2):
    search_query = {
        "query": {
            "match": {
                "body": query
            }
        }
    }
    response = es.search(index="elastic_blogs-full-embeddings_e5", body=search_query)
    top_docs = [hit["_source"]["body"] for hit in response["hits"]["hits"][:top_n]]
    #top_docs = [hit["_source"]["content"] for hit in response["hits"]["hits"][:top_n]]
    line_separated = "\n\n".join(top_docs)
    print(line_separated)

In [None]:
retrieve_documents("Kibana for data analytics",top_n=3)

<br>

That was a simple, but we want to be able to run a more sophisticated lexical search on Elasticsearch so we can RAG to the LLM  more relevant documents.

The function `create_response` can run searches by calling a search_template (which is more newly a search_application).
The search application is running a hybrid search  -  lexical and semantic - combined using RRF.

In [None]:
#First run with `render_query` to see the hybrid search and check that parameters get assigned values.

app_name = "RAG_application"                   #search_application built in Kibana Console
params1 = {"query_string" : "My first query","size" : 2}

create_response = es.search_application.render_query(name=app_name, params=params1)

print("The render_query shows the search code is a bool and semantic search combined by RRF: \n")
jsn(create_response)

In [None]:
#run with "search" to do a search on Elasticsearch

app_name = "RAG_application"
params1 = {"query_string" : "My first query", "size" : 3}   #dictionary of key:values

create_response = es.search_application.search(name=app_name, params=params1)

print("Documents from running the query: ")
jsn(create_response)

 <br>

In [None]:
#retrieve_documemts is a function to run a search template/application
def retrieve_documents(query,  top_n=2, search_template="RAG_application"):
    params = {"query_string": query}
    params["size"]=top_n
    response = es.search_application.search(name=search_template, params=params)
    top_docs = [hit["_source"]["body"] for hit in response["hits"]["hits"][:top_n]]
    return "\n".join(top_docs)

In [None]:
#unit test
query = "How can I secure my networks between elasticsearch nodes?"
retrieved_documents = retrieve_documents(query)
print("Retrieved Documents:", retrieved_documents)

<br> 

# Interact with LLM

In [None]:
!pip install openai

In [None]:
# LLM is from OpenAI 
from openai import OpenAI

In [None]:
#Start with a simple, one-pass interacation with the LLM. The function call2llm takes a systems_prompt, which is the 
#persona the system assumes in the interaction, and "users_prompt" which is the input from the user chatting with the LLM

def call2llm(systems_prompt, users_prompt):
    client = OpenAI(api_key=openai_api_key)
    response = client.chat.completions.create(
        messages=[
            {"role": "system", "content": systems_prompt},
            {"role": "user", "content": users_prompt}
        ],
        model="gpt-4.1",
        temperature=0.000001  # low means consistent LLM responses (high means more creative)
    )
    response = response.choices[0].message
    return response

In [None]:
#test
llm_answer = call2llm("You're a helpful assistant", "What is 2+2?")
print(llm_answer)

In [None]:
llm_answer2 = call2llm("You're a helpful assistant", "What did we just sum?")
print(llm_answer2)

No memory in call2llm of what happened previously.

<br>

#### Implement instead as a python class, which will help in adding conversational memory.  

In [None]:
class ChatWithLlm:
    def __init__(self,systems_prompt="assistant",model="gpt-4.1"):
        self.systems_prompt = systems_prompt
        self.model = model
        self.history = [{"role":"system",  
                         "content":systems_prompt}]          #history helps us "keep memory" of what happened before
   
    def call2llm(self, users_prompt, temperature=0.00001):   #low temperature means consistent LLM responses (high means more creative)
        client = OpenAI(api_key=openai_api_key)
        self.history.append({"role": "user", "content": users_prompt})   #user role prompts the LLM 
        response = client.chat.completions.create(
            messages=self.history,
            model=self.model,
            temperature=temperature,
        )
        response_llm = str(response.choices[0].message.content)
        self.history.append({"role": "assistant", "content": response_llm})
        return response_llm

In [None]:
#test with an instance of the ChatWithLlm class
chat = ChatWithLlm("You're a helpful assistant")
llm_answer =  chat.call2llm("What is 2 + 2?")
print(llm_answer)

In [None]:
llm_answer =  chat.call2llm("What did I just ask you?")
print(llm_answer)

In [None]:
llm_answer =  chat.call2llm("How did you remember what was asked?")
print(llm_answer)

 <br>
 <br>

## RAG solution

Finally here is the python class that performs our RAG solution.

Elastic_rag both queries Elastisearch and feeds those docs to the LLM in a prompt.

In [None]:
class Elasticsearch_rag:
    def __init__(self, systems_prompt="You are a helpful assistant.", model="gpt-4.1"):                
        #self.previous_response_id = None
        self.systems_prompt = systems_prompt
        self.model = model 
        self.history = [{"role": "system", "content": systems_prompt}]

    #retrieve documents from Elasticsearch
    def retrieve(self, query,  top_n=2, search_template="RAG_application"):
        params = {"query_string": query}
        params["size"]=top_n
        response = es.search_application.search(name=search_template, params=params)
        top_docs = [hit["_source"]["body"] for hit in response["hits"]["hits"][:top_n]]
        return "\n".join(top_docs)

    #combine user's query, conversation history, and docs from Elasticsearch to send to LLM
    def augment (self, query, temperature=0.00001):
        client = OpenAI(api_key=openai_api_key)
        self.history.append({"role": "user", "content": query})
        retrieval = Elasticsearch_rag()
        retrieved = retrieval.retrieve(query)
        prompt = ( "This is the query: "  +  query +  " Here are supporting documents. " + retrieved)
        self.history.append({"role": "user", "content": query})
        response = client.chat.completions.create(
            messages=self.history,
            model=self.model,
            temperature=temperature,
        )
        response_llm = str(response.choices[0].message.content)
        self.history.append({"role": "assistant", "content": response_llm})
        return response_llm

In [None]:
conversation = Elasticsearch_rag()   # an instance of a conversation
print(conversation.augment("What is Kibana good for?"))

In [None]:
print(conversation.augment("Can I run Kibana in a Docker container?"))

In [None]:
print(conversation.augment("What was the first question I asked?"))

Congratulations!  We have examined how to create a RAG application that feeds documents from Elasticsearch to OpenAI's GPT LLM.