# RAG End to End Pipeline with ElasticSearch and OpenAI

In [1]:
!pip install python-dotenv
!pip install elasticsearch
!pip install openai
!pip install tqdm



In [1]:
import os
from dotenv import load_dotenv
import openai

load_dotenv()

OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
ELAST_SEARCH_PWD = os.getenv('ELASTIC_SEARCH_PWD')

## Data fetching and processing

In [2]:
from datasets import load_dataset
import pandas as pd

# Load the dataset
def prepare_data(name):
    dataset = load_dataset(name)

    # Convert the dataset to a pandas DataFrame
    train_df = pd.DataFrame(dataset['train'])
    
    # Extract each row as a dictionary and compile into a list of document
    doc_list = []
    for index, row in train_df.iterrows():
        doc_dict = row.to_dict()
        doc_list.append(doc_dict)
    return doc_list
    
    
doc_list = prepare_data("microsoft/wiki_qa")

## Elastic Search

Start ElasticSearch server before this step via the docker compose file

In [3]:
from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200", basic_auth=('elastic', ELAST_SEARCH_PWD))
es.info()

ObjectApiResponse({'name': '0eb7766eedf4', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'o24P37j-SCy8ibGvjUXlAQ', 'version': {'number': '8.13.4', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': 'da95df118650b55a500dcc181889ac35c6d8da7c', 'build_date': '2024-05-06T22:04:45.107454559Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

### Store Data in ElasticSearch

In [4]:
from tqdm.auto import tqdm

def index_document(index_name, documents):
    """
    Index documents into Elasticsearch. If the index doesn't exist, it creates one.
    Loop through each document and index it into Elasticsearch using the question_id as a unique identifier.
    """
    
    # Define the settings and mappings for the Elasticsearch index
    index_settings = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "mappings": {
            "properties": {
                "answer": {"type": "text"},
                "document_title": {"type": "text"},
                "question": {"type": "text"},
                "question_id": {"type": "keyword"}
            }
        }
    }

    # Check if the index exists and create it if it does not
    if not es.indices.exists(index=index_name):
        es.indices.create(index=index_name, body=index_settings)
        print("Index created")
    else:
        print("Index already exists")
    
    # Index each document
    for doc in tqdm(documents):
        doc_id = doc['question_id']  # Using 'question_id' as the unique identifier
        es.index(index=index_name, id=doc_id, document=doc)
        
        
index_name = "wiki_qa_questions"
index_document(index_name,doc_list)

Index already exists


  0%|          | 0/20360 [00:00<?, ?it/s]

### Search the stored data 

In [5]:
def retrieve_documents(query, index_name, max_results=5):
    """
    Retrieve documents from an Elasticsearch index based on a search query.
    Returns the top results up to a specified maximum number.
    """
    
    es = Elasticsearch("http://localhost:9200", basic_auth=('elastic', ELAST_SEARCH_PWD))
    
    search_query = {
        "size": max_results,
        "query": {
            "bool": {
                "must": {
                    "multi_match": {
                        "query": query,    # The actual search query text
                        "fields": ["question^3", "answer", "document_title"],  # Fields to search in with boosting 'question' field
                        "type": "best_fields"
                    }
                }
            }
        }
    }
    
    # Perform the search with the defined query
    response = es.search(index=index_name, body=search_query)
    
    # Extract and return the document sources from the search hits
    documents = [hit['_source'] for hit in response['hits']['hits']]
    
    return documents

In [6]:
user_question = "How are epithelial tissues joined together?"

response = retrieve_documents(query=user_question,index_name="wiki_qa_questions")
print(response)

for doc in response:
    print(f"Title: {doc['document_title']}\nQuestion: {doc['question']}\nAnswer: {doc['answer']}\n\n")

[{'question_id': 'Q8', 'question': 'How are epithelial tissues joined together?', 'document_title': 'Tissue (biology)', 'answer': 'With these tools, the classical appearances of tissues can be examined in health and disease, enabling considerable refinement of clinical diagnosis and prognosis .', 'label': 0}, {'question_id': 'Q277', 'question': 'what films has hugh grant and richard curtis done together', 'document_title': 'Hugh Grant', 'answer': 'In a career spanning 30 years, Grant has repeatedly claimed that acting is not a true calling but just a job he fell into.', 'label': 0}, {'question_id': 'Q2508', 'question': 'What is the function of an epithelial free surface that is smooth?', 'document_title': 'Epithelium', 'answer': 'Epithelia can also be organized into clusters of cells that function as exocrine and endocrine glands.', 'label': 0}, {'question_id': 'Q41', 'question': 'how cds are read', 'document_title': 'CD-ROM', 'answer': 'It adapted the format to hold any form of data.'

## RAG

In [7]:
import os
import openai

# Set the OpenAI API key for accessing the API
openai.api_key = os.getenv('OPENAI_API_KEY')

def build_context(documents):
    """
    Create a string of information from documents to help the AI understand the question.
    This string includes document titles, questions, and answers.
    """
    context = ""
    for doc in documents:
        doc_str = f"Title: {doc['document_title']}\nQuestion: {doc['question']}\nAnswer: {doc['answer']}\n\n"
        context += doc_str
    return context.strip()

def augment_prompt(user_question, documents):
    """
    Create a message for the AI model that includes the user's question and detailed context
    from documents. This helps the AI provide a more accurate answer.
    """
    context = build_context(documents)
    return f"""
    You're an AI assistant.
    Answer the user QUESTION based on CONTEXT - the documents retrieved from our FAQ database.
    Don't use other information outside of the provided CONTEXT.

    QUESTION: {user_question}

    CONTEXT:

    {context}
    """.strip()

def generate_response(prompt, model="gpt-3.5-turbo"):
    """
    Send the prompt to OpenAI and get the model's response. This uses detailed context
    to improve the quality of the AI's answer.
    """
    response = openai.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content



### End-to-End RAG Pipeline

In [8]:
def qa_bot(user_question, index_name):
    """
    Orchestrate the full RAG flow: retrieve relevant documents, construct a prompt based on these
    documents, and query the AI model to generate a response. This function encapsulates the end-to-end
    process of a retrieval-augmented generation system.
    """
    context_docs = retrieve_documents(user_question, index_name)
    prompt = augment_prompt(user_question, context_docs)
    return generate_response(prompt)



# Define the index name and user question
index_name = "wiki_qa_questions"
user_question = "How are epithelial tissues joined together?"

# Call the QA bot to get the answer
qa_bot(user_question, index_name)

'Epithelial tissues are joined together in clusters of cells that can function as exocrine and endocrine glands.'