In [1]:
# Imports

from qdrant_client import QdrantClient, models
from fastembed import  TextEmbedding

from openai import OpenAI
import requests
import json

  from .autonotebook import tqdm as notebook_tqdm


In [2]:

docs_url = 'https://github.com/alexeygrigorev/llm-rag-workshop/raw/main/notebooks/documents.json'
docs_response = requests.get(docs_url)
documents_raw = docs_response.json()

docs = []
for c in documents_raw:
    for d in c['documents']:
        d['course'] = c['course']
        docs.append(d)

docs[0]

{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.",
 'section': 'General course-related questions',
 'question': 'Course - When will the course start?',
 'course': 'data-engineering-zoomcamp'}

In [3]:
qd_client = QdrantClient("http://localhost:6333") 

EMBEDDING_DIMENSIONS = 512
model_handle = "jinaai/jina-embeddings-v2-small-en"

In [4]:
collection_name = "zoomcamp-faq"
qd_client.delete_collection(collection_name)

True

In [5]:


qd_client.create_collection(
    collection_name=collection_name,
    vectors_config=models.VectorParams(
        size=EMBEDDING_DIMENSIONS,
        distance=models.Distance.COSINE
    )
)


True

In [6]:
# qd_client.delete_collection(collection_name)


In [7]:
docs[1]

{'text': 'GitHub - DataTalksClub data-engineering-zoomcamp#prerequisites',
 'section': 'General course-related questions',
 'question': 'Course - What are the prerequisites for this course?',
 'course': 'data-engineering-zoomcamp'}

In [8]:
qd_client.create_payload_index(
    collection_name=collection_name,
    field_name="course",
    field_schema="keyword" # exact match on string metadata field
)

UpdateResult(operation_id=1, status=<UpdateStatus.COMPLETED: 'completed'>)

In [9]:
points = []

for i, doc in enumerate(docs):
    
    q_a = doc['question'] + ' ' + doc['text']  # Concatenate question and text for embedding
    vector=models.Document(text=q_a, model=model_handle)

    point = models.PointStruct(
        id=i,
        vector=vector,
        payload=doc
    )
    points.append(point)

qd_client.upsert(
    collection_name=collection_name,
    points=points
)

UpdateResult(operation_id=2, status=<UpdateStatus.COMPLETED: 'completed'>)

In [10]:

def vector_search(question, course="data-engineering-zoomcamp", limit=5):
    print(f"Using Vector Search with filter: {course}. Results limit: {limit}")
    
    q_points = qd_client.query_points(
        collection_name=collection_name,
        query=models.Document(
            text=question,
            model=model_handle
        ),
        query_filter=models.Filter(
            must=[
                models.FieldCondition(
                    key="course",
                    match=models.MatchValue(value=course)
                )
            ]
        ),
        limit=limit,
        with_payload=True
    )

    results = []
    for point in q_points.points:
        results.append(point.payload)

    return results

In [11]:
vector_search("How to install Kafka?", course="data-engineering-zoomcamp", limit=3)

Using Vector Search with filter: data-engineering-zoomcamp. Results limit: 3


[{'text': 'Ans: No, it is not.',
  'section': 'Workshop 2 - RisingWave',
  'question': 'Setup - Qn: Is kafka install required for the RisingWave workshop? [source]',
  'course': 'data-engineering-zoomcamp'},
 {'text': 'If you have this error, it most likely that your kafka broker docker container is not working.\nUse docker ps to confirm\nThen in the docker compose yaml file folder, run docker compose up -d to start all the instances.',
  'section': 'Module 6: streaming with kafka',
  'question': 'kafka.errors.NoBrokersAvailable: NoBrokersAvailable',
  'course': 'data-engineering-zoomcamp'},
 {'text': 'According to https://github.com/dpkp/kafka-python/\n“DUE TO ISSUES WITH RELEASES, IT IS SUGGESTED TO USE https://github.com/wbarnha/kafka-python-ng FOR THE TIME BEING”\nUse pip install kafka-python-ng instead',
  'section': 'Project',
  'question': 'How to fix the error "ModuleNotFoundError: No module named \'kafka.vendor.six.moves\'"?',
  'course': 'data-engineering-zoomcamp'}]

In [12]:
# LLM API client

%load_ext dotenv
%dotenv /Users/sethurama/DEV/LM/course-llm-zc/.env

In [13]:
llm_client = OpenAI()

def build_prompt(q_question, search_results):
    prompt_template = """
    You're a course teaching assistant. Answer the QUESTION based on the CONTEXT.
Use only the facts from the CONTEXT when answering the QUESTION.
If the CONTEXT doesn't contain the answer, output NONE


QUESTION: {question} 

CONTEXT: {context}
""".strip()

    context = ""

    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer:  {doc['text']}\n\n"
    
    prompt = prompt_template.format(question=q_question, context=context).strip()
    return prompt

# Query the LLM with the modified prompt
def query_llm(mod_prompt):
    response = llm_client.chat.completions.create(
        model = 'gpt-4o-mini',
        messages = [{"role": "user", "content": mod_prompt}]
    )
    
    return response.choices[0].message.content

In [14]:
def rag(query):
    search_results = vector_search(query)
    prompt = build_prompt(query, search_results)
    answer = query_llm(prompt)
    return answer

    

In [15]:
rag("How do I run kafka?")

Using Vector Search with filter: data-engineering-zoomcamp. Results limit: 5


'To run Kafka in the terminal for Java, navigate to the project directory and execute the following command:\n\n```\njava -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java\n```\n\nMake sure to replace `<jar_name>` with the actual name of your jar file.'

In [5]:
### Hybrid Search


In [6]:
from qdrant_client import models

# Create the collection with spcified sparse vector parameters
qd_client.create_collection(
    collection_name="zoomcamp-sparse",
    sparse_vectors_config={
        "bm25": models.SparseVectorParams(
            modifier=models.Modifier.IDF
        )
    }
)

True

In [7]:
import uuid

# Send the points to the collection
qd_client.upsert(
    collection_name="zoomcamp-sparse",
    points=[
        models.PointStruct(
            id=uuid.uuid4().hex,
            vector={
                "bm25": models.Document(
                    text=doc["text"],
                    model="Qdrant/bm25",
                ),
            },
            payload={
                "text": doc["text"],
                "section": doc["section"],
                "course": doc["course"],
            }
        )
        for course in documents_raw
        for doc in course["documents"]
    ]
)

Fetching 18 files: 100%|██████████| 18/18 [00:00<00:00, 21.85it/s]


UpdateResult(operation_id=0, status=<UpdateStatus.COMPLETED: 'completed'>)

In [8]:
# Running sparse vector search with BM25



In [9]:
def search_sparse(query: str, limit: int = 1) -> list[models.ScoredPoint]:
    results = qd_client.query_points(
        collection_name="zoomcamp-sparse",
        query=models.Document(
            text=query,
            model="Qdrant/bm25"
        ),
        using="bm25",
        limit=limit,
        with_payload=True
    )

    return results.points

In [11]:
results = search_sparse("How to install Kafka?", limit=3)
results

[ScoredPoint(id='818cceb7-7529-4850-a5f8-f416e6125012', version=0, score=11.458191, payload={'text': "✅SOLUTION: pip install confluent-kafka[avro].\nFor some reason, Conda also doesn't include this when installing confluent-kafka via pip.\nMore sources on Anaconda and confluent-kafka issues:\nhttps://github.com/confluentinc/confluent-kafka-python/issues/590\nhttps://github.com/confluentinc/confluent-kafka-python/issues/1221\nhttps://stackoverflow.com/questions/69085157/cannot-import-producer-from-confluent-kafka", 'section': 'Module 6: streaming with kafka', 'course': 'data-engineering-zoomcamp'}, vector=None, shard_key=None, order_value=None),
 ScoredPoint(id='7a05efa6-028f-431a-a9a3-f25bcd97a7ec', version=0, score=10.810294, payload={'text': 'If you get an error while running the command python3 stream.py worker\nRun pip uninstall kafka-python\nThen run pip install kafka-python==1.4.6\nWhat is the use of  Redpanda ?\nRedpanda: Redpanda is built on top of the Raft consensus algorithm 

In [12]:
results = search_sparse("Qdrant", limit=3)
results

[]

In [14]:
results = search_sparse("pandas", limit=3)
print(results[0].payload['text'])

You can use round() function or f-strings
round(number, 4)  - this will round number up to 4 decimal places
print(f'Average mark for the Homework is {avg:.3f}') - using F string
Also there is pandas.Series. round idf you need to round values in the whole Series
Please check the documentation
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.round.html#pandas.Series.round
Added by Olga Rudakova


In [15]:
# Scores returned by BM25 are not calculated with cosine similarity, but with BM25 formula. They are not bounded to a specific range, but are virtually unbounded.
results[0].score

6.0392046

In [16]:
# Natural language like queries


In [17]:
import random
import json

random.seed(202506)

course = random.choice(documents_raw)
course_piece = random.choice(course["documents"])
print(json.dumps(course_piece, indent=2))

{
  "text": "Even though the upload works using aws cli and boto3 in Jupyter notebook.\nSolution set the AWS_PROFILE environment variable (the default profile is called default)",
  "section": "Module 4: Deployment",
  "question": "Uploading to s3 fails with An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.\"",
  "course": "mlops-zoomcamp"
}


In [18]:
results = search_sparse(course_piece["question"])
print(results[0].payload["text"])

The trial dbt account provides access to dbt API. Job will still be needed to be added manually. Airflow will run the job using a python operator calling the API. You will need to provide api key, job id, etc. (be careful not committing it to Github).
Detailed explanation here: https://docs.getdbt.com/blog/dbt-airflow-spiritual-alignment
Source code example here: https://github.com/sungchun12/airflow-toolkit/blob/95d40ac76122de337e1b1cdc8eed35ba1c3051ed/dags/examples/dbt_cloud_example.py


In [21]:
# 4. Qdrant Universal Query API - prefetching



In [22]:
# Create the collection with both vector types
qd_client.create_collection(
    collection_name="zoomcamp-sparse-and-dense",
    vectors_config={
        # Named dense vector for jinaai/jina-embeddings-v2-small-en
        "jina-small": models.VectorParams(
            size=512,
            distance=models.Distance.COSINE,
        ),
    },
    sparse_vectors_config={
        "bm25": models.SparseVectorParams(
            modifier=models.Modifier.IDF,
        )
    }
)

True

In [24]:
qd_client.upsert(
    collection_name="zoomcamp-sparse-and-dense",
    points=[
        models.PointStruct(
            id=uuid.uuid4().hex,
            vector={
                "jina-small": models.Document(
                    text=doc["text"],
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                "bm25": models.Document(
                    text=doc["text"], 
                    model="Qdrant/bm25",
                ),
            },
            payload={
                "text": doc["text"],
                "section": doc["section"],
                "course": course["course"],
            }
        )
        for course in documents_raw
        for doc in course["documents"]
    ]
)

Fetching 5 files: 100%|██████████| 5/5 [00:02<00:00,  2.08it/s]


UpdateResult(operation_id=0, status=<UpdateStatus.COMPLETED: 'completed'>)

In [25]:
def multi_stage_search(query: str, limit: int = 1) -> list[models.ScoredPoint]:
    results = qd_client.query_points(
        collection_name="zoomcamp-sparse-and-dense",
        prefetch=[
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                using="jina-small",
                # Prefetch ten times more results, then
                # expected to return, so we can really rerank
                limit=(10 * limit),
            ),
        ],
        query=models.Document(
            text=query,
            model="Qdrant/bm25", 
        ),
        using="bm25",
        limit=limit,
        with_payload=True,
    )

    return results.points

In [26]:
print(json.dumps(course_piece, indent=2))


{
  "text": "Even though the upload works using aws cli and boto3 in Jupyter notebook.\nSolution set the AWS_PROFILE environment variable (the default profile is called default)",
  "section": "Module 4: Deployment",
  "question": "Uploading to s3 fails with An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.\"",
  "course": "mlops-zoomcamp"
}


In [27]:
results = multi_stage_search(course_piece["question"])
print(results[0].payload["text"])

Problem description. How can we connect s3 bucket to MLFLOW?
Solution: Use boto3 and AWS CLI to store access keys. The access keys are what will be used by boto3 (AWS' Python API tool) to connect with the AWS servers. If there are no Access Keys how can they make sure that they have the right to access this Bucket? Maybe you're a malicious actor (Hacker for ex). The keys must be present for boto3 to talk to the AWS servers and they will provide access to the Bucket if you possess the right permissions. You can always set the Bucket as public so anyone can access it, now you don't need access keys because AWS won't care.
Read more here: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
Added by Akshit Miglani


In [30]:
# 5. Building Hybrid Search. 

def rrf_search(query: str, limit: int = 1) -> list[models.ScoredPoint]:
    results = qd_client.query_points(
        collection_name="zoomcamp-sparse-and-dense",
        prefetch=[
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                using="jina-small",
                limit=(5 * limit),
            ),
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="Qdrant/bm25",
                ),
                using="bm25",
                limit=(5 * limit),
            ),
        ],
        # Fusion query enables fusion on the prefetched results
        query=models.FusionQuery(fusion=models.Fusion.RRF),
        with_payload=True,
    )

    return results.points



In [31]:
results = rrf_search(course_piece["question"])
print(json.dumps(course_piece, indent=2))
print(results[0].payload["text"])

{
  "text": "Even though the upload works using aws cli and boto3 in Jupyter notebook.\nSolution set the AWS_PROFILE environment variable (the default profile is called default)",
  "section": "Module 4: Deployment",
  "question": "Uploading to s3 fails with An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.\"",
  "course": "mlops-zoomcamp"
}
The trial dbt account provides access to dbt API. Job will still be needed to be added manually. Airflow will run the job using a python operator calling the API. You will need to provide api key, job id, etc. (be careful not committing it to Github).
Detailed explanation here: https://docs.getdbt.com/blog/dbt-airflow-spiritual-alignment
Source code example here: https://github.com/sungchun12/airflow-toolkit/blob/95d40ac76122de337e1b1cdc8eed35ba1c3051ed/dags/examples/dbt_cloud_example.py
