# Week 1 — Q4–Q6 (Podcasts ingestion + chunking + search)

This notebook fetches DataTalks.Club podcasts, chunks paragraphs, indexes with **minsearch**, and answers a query.

In [None]:
# If `python-frontmatter` isn't installed yet, uncomment to add it to your env:
# !uv add python-frontmatter

In [None]:
import os, io, json, requests, traceback
from typing import Iterable, Callable, Dict, Any, List
from dataclasses import dataclass

import frontmatter
from minsearch import Index

GITHUB_API_DIR = "https://api.github.com/repos/DataTalksClub/datatalksclub.github.io/contents/_podcast?ref=main"
RAW_REPO_URL = "https://raw.githubusercontent.com/DataTalksClub/datatalksclub.github.io/main/_podcast"
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": "ai-bootcamp-notebook"})

In [None]:
def list_podcast_files() -> list[dict]:
    r = SESSION.get(GITHUB_API_DIR, timeout=30)
    r.raise_for_status()
    items = r.json()
    return [it for it in items if it.get("type") == "file" and it.get("name","").lower().endswith((".md",".mdx"))]

@dataclass
class RawFile:
    filename: str
    content: str

def fetch_podcast_markdowns(files: list[dict]) -> list[RawFile]:
    data = []
    for it in files:
        url = it.get("download_url") or f"{RAW_REPO_URL}/{it['name']}"
        name = it["name"]
        try:
            resp = SESSION.get(url, timeout=60)
            resp.raise_for_status()
            content = resp.text.strip()
            data.append(RawFile(filename=name, content=content))
        except Exception as e:
            print(f"Failed to fetch {name}: {e}")
            traceback.print_exc()
    return data

## Q4 — Download the podcast data and count records

In [None]:
files = list_podcast_files()
raw_docs = fetch_podcast_markdowns(files)
num_records = len(raw_docs)
print(f"Q4: number of podcast records: {num_records}")
for rf in raw_docs[:3]:
    print(" -", rf.filename)

In [None]:
def parse_frontmatter(raw_docs: list[RawFile]) -> list[dict]:
    parsed = []
    for rf in raw_docs:
        post = frontmatter.loads(rf.content)
        d = post.to_dict()
        d["filename"] = rf.filename
        if "content" not in d:
            d["content"] = post.content if isinstance(post.content, str) else str(post.content)
        parsed.append(d)
    return parsed

docs = parse_frontmatter(raw_docs)
print("Sample keys:", sorted(set().union(*[set(d.keys()) for d in docs])))

## Q5 — Chunk by paragraphs (size=30, overlap=15) and count chunks

In [None]:
from typing import Any

def split_paragraphs(text: str) -> list[str]:
    parts = [p.strip() for p in text.replace("\r\n","\n").split("\n\n")]
    return [p for p in parts if p]

def sliding_window(seq: list[Any], size: int, step: int) -> list[dict]:
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")
    n = len(seq)
    results = []
    i = 0
    while i < n:
        batch = seq[i:i+size]
        if not batch:
            break
        results.append({"start": i, "content": batch})
        if i + size >= n:
            break
        i += step
    return results

def chunk_paragraphs(documents: list[dict], size: int = 30, overlap: int = 15, content_field: str = "content") -> list[dict]:
    step = max(1, size - overlap)
    out = []
    for d in documents:
        text = d.get(content_field, "") or ""
        paragraphs = split_paragraphs(text)
        windows = sliding_window(paragraphs, size=size, step=step)
        for w in windows:
            chunk = d.copy()
            chunk["content"] = "\n\n".join(w["content"])
            chunk["para_start"] = w["start"]
            out.append(chunk)
    return out

chunks = chunk_paragraphs(docs, size=30, overlap=15, content_field="content")
print(f"Q5: number of chunks: {len(chunks)}")
print(chunks[0]["filename"], "paras-start:", chunks[0]["para_start"])
print(chunks[0]["content"][:300].replace("\n"," ") + " ...")

## Q6 — Index with minsearch and query

In [None]:
index = Index(text_fields=["content", "title", "filename", "description"])
index.fit(chunks)

query = "how do I make money with AI?"
results = index.search(query=query, num_results=10)

print("Top 3 results:")
for i, r in enumerate(results[:3], 1):
    print(f"{i}. {r.get('title')!r} — {r.get('filename')}")

first_title = results[0].get("title") if results else None
first_filename = results[0].get("filename") if results else None
print("\nQ6: first episode in results:", first_title or "<no title>", "(", first_filename, ")")