load relevant libraries

In [None]:
from pinecone import Pinecone,ServerlessSpec
from openai import OpenAI
import numpy as np
import itertools
import pandas as pd 
import uuid
import os
from dotenv import load_dotenv


#loading the environment
load_dotenv()

#loading file
filepath = ''
df  = pd.read_csv(filepath,chunksize=100)
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

creating the index to store and query the data

In [None]:
def create_or_connect_index(name):
    """
    This function creates a new Pinecone index with the specified name if it doesn't already exist,
    or connects to an existing index with that name.

    Parameters:
    name (str): The name of the Pinecone index to create or connect to.
    """
    # initializing connection
    pc = Pinecone(
        api_key=os.getenv('pinecone_api_key'),
        pool_threads=30  # defines the number of simultaneous processes allowed
    )
    
    if name in pc.list_indexes().to_list():
        print("index already exists: connected successfully")
    else:
        # creating index
        pc.create_index(
            name=name,
            dimensions=1536,  # openai models output dimensions at 1536
            spec=ServerlessSpec(
                cloud='aws',
                region='us-east-1'
            )
        )
    
    return pc.Index(name, pool_threads=30)

Injesting data into the index via parallel batching

In [None]:
def ingest(df, index, namespace, emb_model='text-embedding-ada-002'):
    """
    This function ingests data into the Pinecone index in an asynchronous manner.
    This way, multiple upsert requests can be sent simultaneously, improving the overall efficiency of the data ingestion process.
    Furthermore, the choice of the embedding model can significantly impact the quality of the search results.


    parameters:
    df: pandas dataframe in chunks
    index: pinecone index object
    namespace (str): string for the index namespace

    """
    import uuid

    #list to hold async results
    async_results = []

    #iterating through the dataframe chunks
    for chunk in df:
        #generating unique ids for each text chunk
        ids = [str(uuid.uuid4()) for _ in range(len(chunk))]

        #extracting texts and creating metadata
        texts = chunk['text'].tolist()

        #creating metadata
        metadata = [{'id': id_, 'text': text} for id_, text in zip(ids, texts)]

        #creating embeddings for texts
        embeddings = client.embeddings.create(
            input=texts,
            model=emb_model
        )['data']
        embeds = [emb['embedding'] for emb in embeddings]
        vectors = [(id_, emb, meta) for id_, emb, meta in zip(ids, embeds, metadata)]
        async_result = index.upsert(    
            vectors=vectors,
            async_req=True,
            namespace=namespace
        )
        async_results.append(async_result)
        
    # Wait for all async upserts to finish
    [result.get() for result in async_results]
    return 'uploaded'

Retrieval function

In [None]:
def retrieve(index, query, namespace, top_k=5, embed_model='text-embedding-ada-002'):
    """ 
    This function retrieves the top_k most similar documents from the Pinecone index based on the provided query.
    The function first creates an embedding for the query using the specified embedding model,
    then performs a similarity search in the Pinecone index within the specified namespace.
    The retrieved documents and their corresponding source IDs are returned as lists.
    The embedding model selected must be compatible with the one used during the ingestion process to ensure accurate similarity matching.

    parameters:
    index: pinecone index object
    query (str): string input query
    namespace (str): string for the index namespace
    top_k (int): number of similar documents to retrieve
    embed_model (str): embedding model to use for creating query embeddings
    """
    #create embeddings for the query
    query_embedding = client.embeddings.create(
        input = query,
        model = embed_model
    )['data'][0]['embedding']

    #performing the query
    result = index.query(
        vector = query_embedding,
        top_k = top_k,
        include_metadata = True,
        namespace = namespace
    )

    #definning emp lists to store both retrieved documents and their source
    documents =[]
    source = []

    #appending retrieved matches and their metadata to respective lists
    for match in result['matches']:
        documents.append(match['metadata']['text'])
        source.append(match['metadata']['id'])

    return documents, source

context builder function

In [None]:
def context_builder(user_input, context_documents):
    """ 
    This function builds a context-aware prompt for a user query by incorporating relevant context documents.
    
    parameters:
    user_input (str): string input query
    context_documents (list): list of strings containing relevant context documents
    """

    #building the prompt which contains context (retrieved documents) and the user query
    sys_prompt = "Use the following context to answer the question."
    context = "\ncontext:\n" + "\n\n".join(context_documents)
    query = f"\nQuestion: {user_input} \nAnswer:"
    prompt = f"{sys_prompt} {context} {query}"
    return prompt

connecting to chat model

In [None]:
def chat(prompt, model ='gpt-4o-mini', temperature=1):
    """ 
    This function generates a chat completion using the specified model and temperature.
    It constructs a prompt with a system message and a user message, then calls the OpenAI API endpoint to get a response.
    
    parameters:
    prompt (str): The complete prompt including system and user messages.
    model (str): The model to use for generating the chat completion.
    temperature (float): The temperature setting for the model, controlling the randomness of the output.
    """

    #calling the openai api endpoint to get a response
    response = client.chat.completions.create(
        model = model,
        messages = [
            {"role": "system", "content": "You are a helpful assistant who answers questions based on the provided context. If the context does not contain the answer, respond with 'I don't know'."},
            {"role": "user", "content": prompt}
        ],
        temperature=temperature
    )
    return response['choices'][0]['message']['content'] + '\n'

Putting it all together

In [None]:
# Create or connect to the Pinecone index named 'Rag-index'
index = create_or_connect_index('Rag-index')

# Ingest data from the dataframe into the index under the namespace 'Rag-namespace'
ingest(df, index, 'Rag-namespace')

# Define the query to search for relevant documents
query = "What is RAG?"

# Retrieve the top 5 most similar documents and their source IDs from the index
documents, source = retrieve(index, query, 'Rag-namespace', top_k=5)

# Build a context-aware prompt using the retrieved documents
prompt = context_builder(query, documents)

# Generate a response from the chat model using the constructed prompt
response = chat(prompt)

# Print the source document IDs for reference
for source_doc in source:
    print(f"Source Document ID: {source_doc}")
