MODULE 1.1:

Introduction to LLM, RAG and how RAG assists LLM

MODULE 1.2:

Configuring the environment

MODULE 1.3: 

Retrieval & Search

In [2]:
# !wget https://raw.githubusercontent.com/alexeygrigorev/minsearch/refs/heads/main/minsearch.py

# dont need to wget the file as minsearch has been installed using pip in terminal. so now, the repitive minsearch.py files can be deleted from the repo.

In [3]:
import minsearch

In [4]:
import json

In [5]:
with open('documents.json','rt') as f_in:
    docs_raw = json.load(f_in)

In [6]:
#docs_raw

# will print the whole document if uncommented. it's really really loooooooooooong.

In [7]:
# Flattening nested documents from multiple courses into a single list.
# Adding a 'course' field to each document to retain its source course label.

documents = []

for course_dict in docs_raw:
    for doc in course_dict['documents']:
        doc['course'] = course_dict['course']
        documents.append(doc)
    

In [8]:
documents[0]

{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.",
 'section': 'General course-related questions',
 'question': 'Course - When will the course start?',
 'course': 'data-engineering-zoomcamp'}

In [9]:
# index is for easy search

index = minsearch.Index(
    text_fields = ['question','text','section'],
    keyword_fields = ['course']
)

In [10]:
q = "the course has already started, can i still enroll?"

In [11]:
# fitting the model using scikit-learn to make it search-efficient as the data is now linked through knowledge-graphs, just like in ML.
index.fit(documents)

<minsearch.Index at 0x75b60b454e00>

In [12]:
# boost allows to put importance manually. 1 is default. 3 is 3 times more important than any field of 1 importance.
boost = {'question' : 3.0, 'section' : 0.5}

# results is storing the indexed output of the search function.
results = index.search(
    query = q,
    filter_dict = {'course' : 'data-engineering-zoomcamp'},  # filter_dict allows to apply a filter - here filtering the answers from the DE course only, ignoring the other courses in the document.
    boost_dict = boost
)

In [13]:
results

[{'text': "Yes, even if you don't register, you're still eligible to submit the homeworks.\nBe aware, however, that there will be deadlines for turning in the final projects. So don't leave everything for the last minute.",
  'section': 'General course-related questions',
  'question': 'Course - Can I still join the course after the start date?',
  'course': 'data-engineering-zoomcamp'},
 {'text': 'Yes, we will keep all the materials after the course finishes, so you can follow the course at your own pace after it finishes.\nYou can also continue looking at the homeworks and continue preparing for the next cohort. I guess you can also start working on your final capstone project.',
  'section': 'General course-related questions',
  'question': 'Course - Can I follow the course after it finishes?',
  'course': 'data-engineering-zoomcamp'},
 {'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 202

answer is retrieved throught the search function. till now, no llm has been used.

MODULE 1.4:

Generating answers with Google Gemini 1.5 Flash Latest 

In [14]:
import google.generativeai as genai

In [15]:
import os
api_key = os.environ.get("GOOGLE_API_KEY")

In [16]:
genai.configure(api_key = api_key)

In [17]:
client = genai.GenerativeModel('gemini-1.5-flash-latest')
response = client.generate_content(contents = q)

In [18]:
q

'the course has already started, can i still enroll?'

In [19]:
response.text

'That depends on the course.  Some courses allow late enrollment, while others do not.  You need to check with the instructor or the organization offering the course.  Look for information on their website or in the course syllabus.\n'

a very generic answer is generated. 

now we need to give answers we retrieved from the knowdledge base as context and put it in the prompt. so that our llm will generate a more specific answer. 

we need to explain what we do. it is a good practice in prompt engg to give it a role.

In [20]:
prompt_template = """
    You are a course teaching assistant. answer the QUESTION based on the CONTEXT from the FAQ database. 
    only use facts from the CONTEXT when answering. 
    if the CONTEXT doesn't contain the answer, then output NONE. 

    QUESTION : {question}

    CONTEXT : {context}
""".strip()

In [21]:
#This part takes the 'results' from minsearch and formats them into a single string.
llmcontext = ""

for doc in results:
    llmcontext += f"section: {doc['section']}\n"
    llmcontext += f"question: {doc['question']}\n"
    llmcontext += f"answer: {doc['text']}\n\n"

# f is formatted string literal. without f, then python would not recognise the curly braces {} as placeholders for variables. 
    # python uses f to recognise curly braces {} for holding variables or expressions that Python will evaluate and convert into strings at runtime.

# without f, the string would be used exactly as it's written. 
    # So, instead of inserting the actual value of doc['section'], your llmcontext would literally contain: "section: {doc['section']}\n"

In [22]:
# Prepare the final prompt using your template ---
# This integrates the user's question 'q' and the 'context'

prompt = prompt_template.format(question = q, context = llmcontext)

In [23]:
print(prompt)

You are a course teaching assistant. answer the QUESTION based on the CONTEXT from the FAQ database. 
    only use facts from the CONTEXT when answering. 
    if the CONTEXT doesn't contain the answer, then output NONE. 

    QUESTION : the course has already started, can i still enroll?

    CONTEXT : section: General course-related questions
question: Course - Can I still join the course after the start date?
answer: Yes, even if you don't register, you're still eligible to submit the homeworks.
Be aware, however, that there will be deadlines for turning in the final projects. So don't leave everything for the last minute.

section: General course-related questions
question: Course - Can I follow the course after it finishes?
answer: Yes, we will keep all the materials after the course finishes, so you can follow the course at your own pace after it finishes.
You can also continue looking at the homeworks and continue preparing for the next cohort. I guess you can also start working 

In [24]:
# Make the call to Gemini with the full prompt and generate text from gemini's response

client.generate_content(contents = prompt).text

"Yes, even if you don't register, you are still eligible to submit homeworks.  However, be aware that there are deadlines for final projects.\n"

What we just accomplished:
1. Took the documents that we retrieved from the knowledge base using the search engine minsearch.
2. Turned it into context.
3. Based on that context we built a prompt.
4. Sent that prompt to our Gemini LLM API to generate a relevant answer instead of a generic one. 

MODULE 1.5:

The RAG flow cleaning and modularizing code.

In [25]:
def search(query):
    # boost allows to put importance manually. 1 is default. 3 is 3 times more important than any field of 1 importance.
    boost = {'question' : 3.0, 'section' : 0.5}

    # results is storing the indexed output of the search function.
    results = index.search(
        query = query,
        #filter_dict = {'course' : 'data-engineering-zoomcamp'},  # filter_dict allows to apply a filter - here filtering the answers from the DE course only, ignoring the other courses in the document.
        boost_dict = boost,
        num_results = 10
    )
    
    return results

In [26]:
def build_prompt(query, search_results):
    prompt_template = """
    You are a course teaching assistant. answer the QUESTION based on the CONTEXT from the FAQ database. 
    Only use facts from the CONTEXT when answering. If the CONTEXT doesn't contain the answer, then output NONE. 

    QUESTION : {question}

    CONTEXT : {context}
    """.strip()

    #This part takes the 'results' from minsearch and formats them into a single string.
    llmcontext = ""

    for doc in search_results:
        llmcontext += f"section: {doc['section']}\n"
        llmcontext += f"question: {doc['question']}\n"
        llmcontext += f"answer: {doc['text']}\n\n"
    
    # Integrating the user's question 'q' and the 'context'
    prompt = prompt_template.format(question = query, context = llmcontext)

    return prompt

In [27]:
def llm(prompt):
    client = genai.GenerativeModel('gemini-1.5-flash-latest')
    response = client.generate_content(contents = prompt)

    return response

In [28]:
query = 'how do i run kafka?'

def rag(query):
    
    # search in the knowledge base (FAQ documents)
    search_results = search(query)
    
    # building the prompt combining our instructions and the search_results from the knowdledge base
    prompt = build_prompt(query, search_results)
    
    # generating answer through the llm using the prompt
    answer = llm(prompt).text

    return answer

In [29]:
rag(query)

'To run Kafka producers, consumers, kstreams, etc., run `java -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java` in the project directory.  If you encounter a `kafka.errors.NoBrokersAvailable` error, your Kafka broker Docker container may not be running.  Use `docker ps` to check, and then run `docker compose up -d` in the Docker Compose YAML file folder to start the instances.  For Python Kafka, if you have a `./build.sh: Permission denied` error, run `chmod +x build.sh` in the `/docker/spark` directory.  If you encounter a "ModuleNotFoundError: No module named \'kafka.vendor.six.moves\'" error, use `pip install kafka-python-ng` instead.\n'

In [30]:
rag('the course has already started, can i still enroll?')

"Yes, you can still join the course even if it has already started.  You may not be able to submit all of the homework assignments, but you can still participate in the course.  To receive a certificate, you need to submit two out of three course projects and review three peers' projects by the deadline.\n"

Now we have modularized all the important code as functions.

MODULE 1.6:

Searching with Elastic Search in place of MinSearch.

In [31]:
documents[0]

{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.",
 'section': 'General course-related questions',
 'question': 'Course - When will the course start?',
 'course': 'data-engineering-zoomcamp'}

In [32]:
from elasticsearch import Elasticsearch

In [33]:
es_client = Elasticsearch('http://localhost:9200')

In [34]:
es_client.info()

ObjectApiResponse({'name': 'a99a4215ada2', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'z9PiyUlISmeztwZ8_iIaJg', 'version': {'number': '9.0.1', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '73f7594ea00db50aa7e941e151a5b3985f01e364', 'build_date': '2025-04-30T10:07:41.393025990Z', 'build_snapshot': False, 'lucene_version': '10.1.0', 'minimum_wire_compatibility_version': '8.18.0', 'minimum_index_compatibility_version': '8.0.0'}, 'tagline': 'You Know, for Search'})

In [36]:
# deleting the current index if you get the settings wrong. then re-index.
es_client.indices.delete(index=index_name)

ObjectApiResponse({'acknowledged': True})

In [37]:
# Creating an index in elastic search
index_settings = {
    "settings" : {
        "number_of_shards" : 1,
        "number_of_replicas" : 0
    },
    "mappings": {
        "properties" : {
            "text" : {"type" : "text"},
            "section" : {"type" : "text"},
            "question" : {"type" : "text"},
            "course" : {"type" : "keyword"}
        }
    }
}

index_name = "course-questions"

es_client.indices.create(index = index_name, body = index_settings)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'course-questions'})

In [38]:
documents[0]

{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.",
 'section': 'General course-related questions',
 'question': 'Course - When will the course start?',
 'course': 'data-engineering-zoomcamp'}

In [39]:
#for seeing the progress bar in indexing the documents
from tqdm.auto import tqdm

In [40]:
#since documents[0] is a list, we need to iterate over it for indexing all of them.
for doc in tqdm(documents):
    es_client.index(index = index_name, document = doc)

  0%|          | 0/948 [00:00<?, ?it/s]

In [41]:
query = "i just discovered the course, can i still join it?"

In [42]:
def elastic_search(query):
    
    search_query = {
        "size": 5,
        "query": {
            "bool": {
                "must": {
                    "multi_match": {
                        "query": query,
                        "fields": ["question^3", "text", "section"],
                        "type": "best_fields"
                    }
                },
                "filter": {
                    "term": {
                        "course": "data-engineering-zoomcamp"
                    }
               }
            }
        }
    }
    # ctrl + / for commenting a block of code

    response = es_client.search(index = index_name, body = search_query)

    result_docs = []
    
    for hit in response['hits']['hits']:
        result_docs.append(hit['_source'])

    return result_docs

In [43]:
# updating the RAG function using elastic_search in place of minsearch
def rag(query):
    search_results = elastic_search(query)
    prompt = build_prompt(query, search_results)
    answer = llm(prompt).text
    return answer

In [44]:
rag(query)

"Yes, even if you don't register, you are still eligible to submit homework.  However, there are deadlines for final projects.\n"

In [45]:
rag("i just discovered the course, can i still join it?")

"Yes, even if you don't register, you are still eligible to submit homework.  However, be aware of the deadlines for final projects.\n"

We have replaced our toy search engine (minsearch) with a proper search engine (elastic search).