Day 1: Ingest and Index Your Data

In [16]:
# Complete Implementation. Let's now put everything together into a reusable function:
import io
import zipfile
import requests
import frontmatter

def read_repo_data(repo_owner, repo_name):
    """
    Download and parse all markdown files from a GitHub repository.
    
    Args:
        repo_owner: GitHub username or organization
        repo_name: Repository name
    
    Returns:
        List of dictionaries containing file content and metadata
    """
    prefix = 'https://codeload.github.com' 
    url = f'{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main'
    resp = requests.get(url)
    
    if resp.status_code != 200:
        raise Exception(f"Failed to download repository: {resp.status_code}")

    repository_data = []
    zf = zipfile.ZipFile(io.BytesIO(resp.content))
    
    for file_info in zf.infolist():
        filename = file_info.filename
        filename_lower = filename.lower()

        if not (filename_lower.endswith('.md') 
            or filename_lower.endswith('.mdx')):
            continue
    
        try:
            with zf.open(file_info) as f_in:
                content = f_in.read().decode('utf-8', errors='ignore')
                post = frontmatter.loads(content)
                data = post.to_dict()
                data['filename'] = filename
                repository_data.append(data)
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            continue
    
    zf.close()
    return repository_data


In [17]:
# We can now use this function for different repositories:

dtc_faq = read_repo_data('DataTalksClub', 'faq')
evidently_docs = read_repo_data('evidentlyai', 'docs')

print(f"FAQ documents: {len(dtc_faq)}")
print(f"Evidently documents: {len(evidently_docs)}")


FAQ documents: 1219
Evidently documents: 95


DAY 2 : Chunking and Intelligent Processing for Data


In [23]:
# "sliding window" method. This method creates overlapping chunks of text, which can help preserve context across chunk boundaries.

def sliding_window(seq, size, step):
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        chunk = seq[i:i+size]
        result.append({'start': i, 'chunk': chunk})
        if i + size >= n:
            break

    return result


In [8]:
# Let's process all the documents:
if 'evidently_docs' not in globals():
    evidently_docs = read_repo_data('evidentlyai', 'docs')

evidently_chunks = []
for doc in evidently_docs:
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')
    chunks = sliding_window(doc_content, 2000, 1000)
    for chunk in chunks:
        chunk.update(doc_copy)
    evidently_chunks.extend(chunks)


In [8]:
# Splitting by Paragraphs and Sections

import re
text = evidently_docs[45]['content']
paragraphs = re.split(r"\n\s*\n", text.strip())

# use regex to split by headers
import re

def split_markdown_by_level(text, level=2):
    """
    Split markdown text by a specific header level.
    
    :param text: Markdown text as a string
    :param level: Header level to split on
    :return: List of sections as strings
    """
    # This regex matches markdown headers
    # For level 2, it matches lines starting with "## "
    header_pattern = r'^(#{' + str(level) + r'} )(.+)$'
    pattern = re.compile(header_pattern, re.MULTILINE)

    # Split and keep the headers
    parts = pattern.split(text)
    
    sections = []
    for i in range(1, len(parts), 3):
        # We step by 3 because regex.split() with
        # capturing groups returns:
        # [before_match, group1, group2, after_match, ...]
        # here group1 is "## ", group2 is the header text
        header = parts[i] + parts[i+1]  # "## " + "Title"
        header = header.strip()

        # Get the content after this header
        content = ""
        if i+2 < len(parts):
            content = parts[i+2].strip()

        if content:
            section = f'{header}\n\n{content}'
        else:
            section = header
        sections.append(section)
    
    return sections

# If we want to split by second-level headers, that's what we do:
sections = split_markdown_by_level(text, level=2)

#Now we iterate over all the docs to create the final result:
evidently_chunks = []

for doc in evidently_docs:
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')
    sections = split_markdown_by_level(doc_content, level=2)
    for section in sections:
        section_doc = doc_copy.copy()
        section_doc['section'] = section
        evidently_chunks.append(section_doc)



In [9]:
from openai import OpenAI
import os

# Set your OpenAI API key here or ensure it's set in your environment
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

openai_client = OpenAI()


def llm(prompt, model='gpt-4o-mini'):
    messages = [
        {"role": "user", "content": prompt}
    ]

    response = openai_client.responses.create(
        model='gpt-4o-mini',
        input=messages
    )

    return response.output_text


In [None]:
prompt_template = """
Split the provided document into logical sections
that make sense for a Q&A system.

Each section should be self-contained and cover
a specific topic or concept.

<DOCUMENT>
{document}
</DOCUMENT>

Use this format:

## Section Name

Section content with all relevant details

---

## Another Section Name

Another section content

---
""".strip()


In [21]:
# create a function for intelligent chunking:
def intelligent_chunking(text):
    prompt = prompt_template.format(document=text)
    response = llm(prompt)
    sections = response.split('---')
    sections = [s.strip() for s in sections if s.strip()]
    return sections


In [None]:
#Now we apply this to every document:
from tqdm.auto import tqdm

evidently_chunks = []

for doc in tqdm(evidently_docs):
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')

    sections = intelligent_chunking(doc_content)
    for section in sections:
        section_doc = doc_copy.copy()
        section_doc['section'] = section
        evidently_chunks.append(section_doc)


Day 3: Add Search


In [24]:
# prepared the docs
if 'read_repo_data' not in globals():
    raise NameError("Function 'read_repo_data' is not defined. Please run the cell that defines it first.")

evidently_docs = read_repo_data('evidentlyai', 'docs')

evidently_chunks = []

for doc in evidently_docs:
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')
    chunks = sliding_window(doc_content, 2000, 1000)
    for chunk in chunks:
        chunk.update(doc_copy)
    evidently_chunks.extend(chunks)


In [25]:
# index this data with minsearch
from minsearch import Index

index = Index(
    text_fields=["chunk", "title", "description", "filename"],
    keyword_fields=[]
)

index.fit(evidently_chunks)

query = 'What should be in a test dataset for AI evaluation?'
results = index.search(query)

In [14]:
dtc_faq = read_repo_data('DataTalksClub', 'faq')

de_dtc_faq = [d for d in dtc_faq if 'data-engineering' in d['filename']]

faq_index = Index(
    text_fields=["question", "content"],
    keyword_fields=[]
)

faq_index.fit(de_dtc_faq)
results = faq_index.search(query)
print(results)

[{'id': '8bfd724e4f', 'question': "Compilation Error in test accepted_values_stg_green_tripdata_Payment_type__False___var_payment_type_values_ (models/staging/schema.yml)  'NoneType' object is not iterable", 'sort_order': 38, 'content': 'In the macro `test_accepted_values` (found in `tests/generic/builtin.sql`), an error was triggered by the test `accepted_values_stg_green_tripdata_Payment_type__False___var_payment_type_values_` located in `models/staging/schema.yml`.\n\nTo resolve this issue, ensure the following variable is added to your `dbt_project.yml` file:\n\n```yaml\nvars:\n  payment_type_values: [1, 2, 3, 4, 5, 6]\n```', 'filename': 'faq-main/_questions/data-engineering-zoomcamp/module-4/038_8bfd724e4f_compilation-error-in-test-accepted_values_stg_gree.md'}, {'id': 'c180431de3', 'question': 'DBT Cloud production error: prod dataset not available in location EU', 'sort_order': 4, 'content': "**Problem:**\n\nI am trying to deploy my DBT models to production using DBT Cloud. The 

In [15]:
%pip install sentence-transformers 

Note: you may need to restart the kernel to use updated packages.


In [16]:
# For vector search, we need to turn our documents into vectors (embeddings).
# We will use the sentence-transformers library for this purpose.

from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer('multi-qa-distilbert-cos-v1')

# Let's take one document from the FAQ dataset:
record = de_dtc_faq[2]
text = record['question'] + ' ' + record['content']
v_doc = embedding_model.encode(text)

# We combine the question and answer text, then convert it to an embedding vector.
# Let's do the same for the query:
query = 'I just found out about the course. Can I enroll now?'
v_query = embedding_model.encode(query)

# This is how we compute similarity between the query and document vectors:
similarity = v_query.dot(v_doc) 





In [17]:
# turn our docs into embeddings
from tqdm.auto import tqdm
import numpy as np

faq_embeddings = []

for d in tqdm(de_dtc_faq):
    text = d['question'] + ' ' + d['content']
    v = embedding_model.encode(text)
    faq_embeddings.append(v)

faq_embeddings = np.array(faq_embeddings)
print(faq_embeddings.shape)  # should be (number_of_docs, embedding_dimension)

100%|██████████| 449/449 [01:39<00:00,  4.50it/s]

(449, 768)





In [18]:
# Now let's use VectorSearch:
from minsearch import VectorSearch

faq_vindex = VectorSearch()
faq_vindex.fit(faq_embeddings, de_dtc_faq)



<minsearch.vector.VectorSearch at 0x7f2dc544c050>

In [19]:
query = 'Can I join the course now?'
q = embedding_model.encode(query)
results = faq_vindex.search(q)


In [20]:
# We first create an embedding for our query (q), then search for similar document embeddings.
# You can easily do the same with the Evidently docs (but only use the chunk field for embeddings):
evidently_embeddings = []

for d in tqdm(evidently_chunks):
    v = embedding_model.encode(d['chunk'])
    evidently_embeddings.append(v)

evidently_embeddings = np.array(evidently_embeddings)

evidently_vindex = VectorSearch()
evidently_vindex.fit(evidently_embeddings, evidently_chunks)



100%|██████████| 575/575 [05:02<00:00,  1.90it/s]


<minsearch.vector.VectorSearch at 0x7f2dbf29f710>

In [21]:
# Hybrid search - combines both exact keyword matches and vector search.
query = 'Can I join the course now?'

text_results = faq_index.search(query, num_results=5)

q = embedding_model.encode(query)
vector_results = faq_vindex.search(q, num_results=5)

final_results = text_results + vector_results


In [12]:
# Putting this together. But before we can use it in our agent, we need to organize the code. Let's put all the code into different functions:
def text_search(query):
    return faq_index.search(query, num_results=5)

def vector_search(query):
    q = embedding_model.encode(query)
    return faq_vindex.search(q, num_results=5)

def hybrid_search(query):
    text_results = text_search(query)
    vector_results = vector_search(query)
    
    # Combine and deduplicate results
    seen_ids = set()
    combined_results = []

    for result in text_results + vector_results:
        if result['filename'] not in seen_ids:
            seen_ids.add(result['filename'])
            combined_results.append(result)
    
    return combined_results



In [17]:
from minsearch import Index, VectorSearch
from sentence_transformers import SentenceTransformer
import numpy as np

# Make sure de_dtc_faq is defined
dtc_faq = read_repo_data('DataTalksClub', 'faq')
de_dtc_faq = [d for d in dtc_faq if 'data-engineering' in d['filename']]

faq_index = Index(
	text_fields=["question", "content"],
	keyword_fields=[]
)
faq_index.fit(de_dtc_faq)

# Load embedding model if not already loaded
embedding_model = SentenceTransformer('multi-qa-distilbert-cos-v1')

# Create embeddings for FAQ documents
faq_embeddings = []
for d in de_dtc_faq:
	text = d['question'] + ' ' + d['content']
	v = embedding_model.encode(text)
	faq_embeddings.append(v)
faq_embeddings = np.array(faq_embeddings)

# Create VectorSearch index
faq_vindex = VectorSearch()
faq_vindex.fit(faq_embeddings, de_dtc_faq)

# Now we can use hybrid_search() to get results from both methods:
query = 'Can I join the course now?'
results = hybrid_search(query)
print(results)

[{'id': '3f1424af17', 'question': 'Course: Can I still join the course after the start date?', 'sort_order': 3, 'content': "Yes, even if you don't register, you're still eligible to submit the homework.\n\nBe aware, however, that there will be deadlines for turning in homeworks and the final projects. So don't leave everything for the last minute.", 'filename': 'faq-main/_questions/data-engineering-zoomcamp/general/003_3f1424af17_course-can-i-still-join-the-course-after-the-start.md'}, {'id': '9e508f2212', 'question': 'Course: When does the course start?', 'sort_order': 1, 'content': "The next cohort starts January 13th, 2025. More info at [DTC](https://datatalks.club/blog/guide-to-free-online-courses-at-datatalks-club.html).\n\n- Register before the course starts using this [link](https://airtable.com/shr6oVXeQvSI5HuWD).\n- Join the [course Telegram channel with announcements](https://t.me/dezoomcamp).\n- Don’t forget to register in DataTalks.Club's Slack and join the channel.", 'file

Day 4: Agents and Tools


In [9]:
# ...replace the existing OpenAI setup cell with this...

from dotenv import load_dotenv
import os
from openai import OpenAI

# Load variables from .env (if present) into environment
load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise RuntimeError(
        "OPENAI_API_KEY not found. Create a .env or set the env var before running."
    )

# Create client explicitly with the key
openai_client = OpenAI(api_key=OPENAI_API_KEY)

# Safe verification (only shows a short prefix)
print("OPENAI_API_KEY present, prefix:", OPENAI_API_KEY[:6] + "..." )

OPENAI_API_KEY present, prefix: sk-pro...


In [10]:
import sys
print(sys.executable)   # should point to /.../.venv/.../python

/home/codespace/.python/current/bin/python


In [11]:
import os
k = os.getenv("OPENAI_API_KEY")
print("Key loaded:", bool(k), "prefix:", (k[:6] + "...") if k else None)

Key loaded: True prefix: sk-pro...


In [12]:
# try asking a question without giving the LLM access to search:
from openai import OpenAI

openai_client = OpenAI()

user_prompt = "I just discovered the course, can I join now?"

chat_messages = [
    {"role": "user", "content": user_prompt}
]

response = openai_client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
)

print(response.output_text)

It depends on the course's policies and schedule. Many courses have specific enrollment periods or prerequisites. I recommend checking the course website or contacting the instructor or administration for more details on joining at this point.


In [13]:
# function calling
# describe this function  using a special description format
text_search_tool = {
    "type": "function",
    "name": "text_search",
    "description": "Search the FAQ database",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "Search query text to look up in the course FAQ."
            }
        },
        "required": ["query"],
        "additionalProperties": False
    }
} 

In [14]:
system_prompt = """
You are a helpful assistant for a course. 
"""

question = "I just discovered the course, can I join now?"

chat_messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": question}
]

response = openai_client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
    tools=[text_search_tool]
)

print(response.output)

[ResponseFunctionToolCall(arguments='{"query":"Can I join the course now?"}', call_id='call_z96IsqOjqpRI8Mr8RkzOyzUF', name='text_search', type='function_call', id='fc_68dac2a3f62081919b8cb721525d6ba90c1ee7902699ba09', status='completed')]


In [15]:
#  invoke the function with these arguments:
import json

if 'response' not in globals():
    raise NameError("Variable 'response' is not defined. Please run the cell that creates 'response' first.")

call = response.output[0]

arguments = json.loads(call.arguments)
result = text_search(**arguments)

call_output = {
    "type": "function_call_output",
    "call_id": call.call_id,
    "output": json.dumps(result),
}

# extending the chat_messages list and sending the entire conversation history back to the LLM
chat_messages.append(call)
chat_messages.append(call_output)

response = openai_client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
    tools=[text_search_tool]
)

print(response.output_text) 

NameError: name 'faq_index' is not defined

In [6]:
# system prompt
# system prompt is very important: it influences how the agent behaves
system_prompt = """
You are a helpful assistant for a course. 

Use the search tool to find relevant information from the course materials before answering questions.

If you can find specific information through search, use it to provide accurate answers.
If the search doesn't return relevant results, let the user know and provide general guidance.
"""

# if we want the agent to make multiple search queries, we can modify the prompt:
system_prompt = """
You are a helpful assistant for a course. 

Always search for relevant information before answering. 
If the first search doesn't give you enough information, try different search terms.

Make multiple searches if needed to provide comprehensive answers.
"""


In [7]:
# Pydantic AI
# A library to create AI agents using Pydantic models
from typing import List, Any

def text_search(query: str) -> List[Any]:
    """
    Perform a text-based search on the FAQ index.

    Args:
        query (str): The search query string.

    Returns:
        List[Any]: A list of up to 5 search results returned by the FAQ index.
    """
    return faq_index.search(query, num_results=5)

    

In [18]:
# We can now define an agent with Pydantic AI and give it the text_search tool:
from pydantic_ai import Agent

agent = Agent(
    name="faq_agent",
    instructions=system_prompt,
    tools=[text_search],
    model='gpt-4o-mini'
)


In [27]:
# Ensure faq_index is defined before running the agent
from minsearch import Index

de_dtc_faq = [d for d in dtc_faq if 'data-engineering' in d['filename']]

faq_index = Index(
	text_fields=["question", "content"],
	keyword_fields=[]
)
faq_index.fit(de_dtc_faq)

# Now run the agent
question = "I just discovered the course, can I join now?"

result = await agent.run(user_prompt=question)



In [31]:
# In Jupyter, use 'await' directly for async functions:
result = await agent.run(user_prompt=question)
print(result)


AgentRunResult(output="Yes, you can still join the course even after the start date. You are eligible to submit homework, but keep in mind that there will be deadlines for submitting assignments and final projects, so it's important not to procrastinate. \n\nFor future reference, the next cohort starts on January 13th, 2025. You can register before the course starts using the provided registration link. If you would like more details, feel free to ask!")
