### RAG

In [1]:
import requests 

docs_url = 'https://github.com/alexeygrigorev/llm-rag-workshop/raw/main/notebooks/documents.json'
docs_response = requests.get(docs_url)
documents_raw = docs_response.json()

documents = []

for course in documents_raw:
    course_name = course['course']

    for doc in course['documents']:
        doc['course'] = course_name
        documents.append(doc)

In [2]:
from minsearch import AppendableIndex

index = AppendableIndex(
    text_fields = ["question", "text", "section"],
    keyword_fields = ["course"]
)

index.fit(documents)

<minsearch.append.AppendableIndex at 0x7427b7b9a600>

In [3]:
index.search("how to use kafka with spark")

[{'text': 'While following tutorial 13.2 , when running ./spark-submit.sh streaming.py, encountered the following error:\n…\n24/03/11 09:48:36 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://localhost:7077...\n24/03/11 09:48:36 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:7077 after 10 ms (0 ms spent in bootstraps)\n24/03/11 09:48:54 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors\n24/03/11 09:48:56 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://localhost:7077…\n24/03/11 09:49:16 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://localhost:7077...\n24/03/11 09:49:36 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.\n24/03/11 09:49:36 ERROR StandaloneSchedulerBacke

In [4]:
def search(query):
    boost = {'question': 3.0, 'section': 0.5}

    results = index.search(
        query=query,
        filter_dict={'course': 'data-engineering-zoomcamp'},
        boost_dict=boost,
        num_results=5,
        output_ids=True
    )

    return results

In [5]:
question = "how to use kafka with spark"

In [6]:
prompt_template = """
You're a course teaching assistant. Answer the QUESTION based on the CONTEXT from the FAQ database.
Use only the facts from the CONTEXT when answering the QUESTION.

<QUESTION>
{question}
</QUESTION>

<CONTEXT>
{context}
</CONTEXT>
""".strip()

def build_prompt(query, search_results):
    context = ""

    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"
    
    prompt = prompt_template.format(question=query, context=context).strip()
    return prompt

In [7]:
search_results = search(question)

In [8]:
prompt = build_prompt(question, search_results)

In [9]:
print(prompt)

You're a course teaching assistant. Answer the QUESTION based on the CONTEXT from the FAQ database.
Use only the facts from the CONTEXT when answering the QUESTION.

<QUESTION>
how to use kafka with spark
</QUESTION>

<CONTEXT>
section: Module 6: streaming with kafka
question: Python Kafka: ./spark-submit.sh streaming.py - ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
answer: While following tutorial 13.2 , when running ./spark-submit.sh streaming.py, encountered the following error:
…
24/03/11 09:48:36 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://localhost:7077...
24/03/11 09:48:36 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:7077 after 10 ms (0 ms spent in bootstraps)
24/03/11 09:48:54 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGeneratio

In [10]:
from openai import OpenAI
from dotenv import load_dotenv
import os

load_dotenv()

client = OpenAI()

def llm(prompt):
    response = client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

def rag(query):
    search_results = search(query)
    prompt = build_prompt(query, search_results)
    answer = llm(prompt)
    return answer

In [11]:
answer = llm(prompt)

In [12]:
print(answer)

To use Kafka with Spark, ensure that your Kafka broker is running properly by checking its status with the command `docker ps`. If there are issues, you may need to start it using `docker compose up -d` from the docker compose yaml file folder. 

Additionally, follow the tutorial provided in your course (referenced as tutorial 13.2). Make sure to have the correct versions of PySpark installed. If you encounter errors while running your Spark jobs, such as connection failures, verify your Spark master status by checking the logs in the spark-master container.

If you're transitioning between environments, remember to downgrade your local PySpark to match the version specified in your Dockerfile, as version mismatches can lead to application errors. You can check your PySpark version with the command `pyspark –version` and update your build configurations accordingly.


In [13]:
rag(question)

"To use Kafka with Spark, follow these general guidelines based on the information provided:\n\n1. **Ensure Access to Kafka:** Make sure your Kafka broker is running. If you're using Docker, you can confirm this by running `docker ps` to check if the Kafka broker container is active. If it's not running, you can start it using `docker compose up -d` from the directory containing your docker-compose YAML file.\n\n2. **Check Spark Configuration:** When submitting a Spark job with Kafka, ensure that the version of PySpark you are using is compatible with your setup. If you encounter connection issues, check your local PySpark version with `pyspark --version` and `spark-submit --version` and ensure it matches any specified requirements in your project (e.g., the version specified in a Dockerfile).\n\n3. **Monitor Logs for Errors:** If you experience issues connecting to the Spark master, you can inspect the logs for errors. Start a new terminal and run:\n   - `docker ps` to get the contain

#### 'Agentic' RAG

In [14]:
prompt_template = """
You are a course teaching assistant.

You are given a Question from a course student and that you need to answer with your own knowledge and provided CONTEXT.
At the beginning the context is EMPTY.

<QUESTION>
{question}
</QUESTION>

<CONTEXT>
{context}
</CONTEXT>

If CONTEXT is EMPTY, you can use your FAQ database.
In this case, use the following output template:

{{
"action": "SEARCH",
"reasoning": "<add your reasoning here>"
}}

If you can answer the QUESTION using CONTEXT, use this template:

{{
"action": "ANSWER",
"answer": "<your answer>",
"source" : "CONTEXT"
}}

If the context doesn't contain the answer, use your own knowledge to answer the question

{{
"action" : "ANSWER",
"answer" : "<your answer>",
"source" : "OWN_KNOWLEDGE"
}}
""".strip()

In [15]:
question = "Can i still join the course?"
context = "EMPTY"

In [16]:
prompt = prompt_template.format(question=question, context=context)
print(prompt)

You are a course teaching assistant.

You are given a Question from a course student and that you need to answer with your own knowledge and provided CONTEXT.
At the beginning the context is EMPTY.

<QUESTION>
Can i still join the course?
</QUESTION>

<CONTEXT>
EMPTY
</CONTEXT>

If CONTEXT is EMPTY, you can use your FAQ database.
In this case, use the following output template:

{
"action": "SEARCH",
"reasoning": "<add your reasoning here>"
}

If you can answer the QUESTION using CONTEXT, use this template:

{
"action": "ANSWER",
"answer": "<your answer>",
"source" : "CONTEXT"
}

If the context doesn't contain the answer, use your own knowledge to answer the question

{
"action" : "ANSWER",
"answer" : "<your answer>",
"source" : "OWN_KNOWLEDGE"
}


In [17]:
answer_json = llm(prompt)

In [18]:
import json

In [19]:
answer = json.loads(answer_json)

In [20]:
answer["action"]

'ANSWER'

In [21]:
def build_context(search_results):
    context = ""

    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"
    
    return context.strip()

In [25]:
search_results = search(question)
context = build_context(search_results)
prompt = prompt_template.format(question=question, context=context)

In [26]:
answer_json = llm(prompt)

In [27]:
print(answer_json)

{
"action": "ANSWER",
"answer": "Yes, you can still join the course even after the start date. You are eligible to submit homework assignments, but keep in mind that there will be deadlines for the final project submissions, so it's important not to leave things until the last moment.",
"source": "CONTEXT"
}
