In [None]:
from minsearch import Index

In [15]:
import io
from typing import Iterable, Callable
import zipfile
import traceback
from dataclasses import dataclass

import requests


@dataclass
class RawRepositoryFile:
    filename: str
    content: str

class GithubRepositoryDataReader:
    """
    Downloads and parses markdown and code files from a GitHub repository.
    """

    def __init__(self,
                repo_owner: str,
                repo_name: str,
                allowed_extensions: Iterable[str] | None = None,
                filename_filter: Callable[[str], bool] | None = None
        ):
        """
        Initialize the GitHub repository data reader.
        
        Args:
            repo_owner: The owner/organization of the GitHub repository
            repo_name: The name of the GitHub repository
            allowed_extensions: Optional set of file extensions to include
                    (e.g., {"md", "py"}). If not provided, all file types are included
            filename_filter: Optional callable to filter files by their path
        """
        prefix = "https://codeload.github.com"
        self.url = (
            f"{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main"
        )

        if allowed_extensions is not None:
            self.allowed_extensions = {ext.lower() for ext in allowed_extensions}

        if filename_filter is None:
            self.filename_filter = lambda filepath: True
        else:
            self.filename_filter = filename_filter

    def read(self) -> list[RawRepositoryFile]:
        """
        Download and extract files from the GitHub repository.
        
        Returns:
            List of RawRepositoryFile objects for each processed file
            
        Raises:
            Exception: If the repository download fails
        """
        resp = requests.get(self.url)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        zf = zipfile.ZipFile(io.BytesIO(resp.content))
        repository_data = self._extract_files(zf)
        zf.close()

        return repository_data

    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        """
        Extract and process files from the zip archive.
        
        Args:
            zf: ZipFile object containing the repository data

        Returns:
            List of RawRepositoryFile objects for each processed file
        """
        data = []

        for file_info in zf.infolist():
            filepath = self._normalize_filepath(file_info.filename)

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="ignore")
                    if content is not None:
                        content = content.strip()

                    file = RawRepositoryFile(
                        filename=filepath,
                        content=content
                    )
                    data.append(file)

            except Exception as e:
                print(f"Error processing {file_info.filename}: {e}")
                traceback.print_exc()
                continue

        return data

    def _should_skip_file(self, filepath: str) -> bool:
        """
        Determine whether a file should be skipped during processing.
        
        Args:
            filepath: The file path to check
            
        Returns:
            True if the file should be skipped, False otherwise
        """
        filepath = filepath.lower()

        # directory
        if filepath.endswith("/"):
            return True

        # hidden file
        filename = filepath.split("/")[-1]
        if filename.startswith("."):
            return True

        if self.allowed_extensions:
            ext = self._get_extension(filepath)
            if ext not in self.allowed_extensions:
                return True

        if not self.filename_filter(filepath):
            return True

        return False

    def _get_extension(self, filepath: str) -> str:
        """
        Extract the file extension from a filepath.
        
        Args:
            filepath: The file path to extract extension from
            
        Returns:
            The file extension (without dot) or empty string if no extension
        """
        filename = filepath.lower().split("/")[-1]
        if "." in filename:
            return filename.rsplit(".", maxsplit=1)[-1]
        else:
            return ""

    def _normalize_filepath(self, filepath: str) -> str:
        """
        Removes the top-level directory from the file path inside the zip archive.
        'repo-main/path/to/file.py' -> 'path/to/file.py'
        
        Args:
            filepath: The original filepath from the zip archive
            
        Returns:
            The normalized filepath with top-level directory removed
        """
        parts = filepath.split("/", maxsplit=1)
        if len(parts) > 1:
            return parts[1]
        else:
            return parts[0]


In [32]:
def read_github_data():
    allowed_extensions = {"md", "mdx"}

    repo_owner = 'DataTalksClub'
    repo_name = 'datatalksclub.github.io'

    reader = GithubRepositoryDataReader(
        repo_owner,
        repo_name,
        allowed_extensions=allowed_extensions,
        filename_filter=lambda path: path.startswith("_podcast")
    )
    
    return reader.read()

In [33]:
github_data = read_github_data()

In [78]:
github_data[2]

RawRepositoryFile(filename='_podcast/s01e01-roles.md', content='---\ntitle: "Data Team Roles Explained"\nshort: "Roles in a Data Team"\nguests: [alexeygrigorev]\n\nimage: images/podcast/s01e01-roles.jpg\n\nkeywords: "data team roles, data scientist, data engineer, machine learning engineer, data analyst, MLOps engineer, product manager, data team structure, data science roles, ML engineer vs data engineer, data team responsibilities, data science career"\n\nseason: 1\nepisode: 1\n\nids:\n  youtube: UukjwSIAnpw\n  anchor: Roles-in-a-data-team---Alexey-Grigorev-emqcft\n\nlinks:\n  youtube: https://www.youtube.com/watch?v=UukjwSIAnpw\n  anchor: https://anchor.fm/datatalksclub/episodes/Roles-in-a-data-team---Alexey-Grigorev-emqcft\n  spotify: TODO\n  apple: TODO\n---\n\nThe topic today is the roles in data teams. We want to understand what kind of people work in the data team, what responsibilities they have, what they do, and what they need to know.\n\n**Q: Before we dive into the differe

In [35]:
len(github_data)

185

In [40]:
print(github_data[40].content)

---
title: "Making Sense of Data Engineering Acronyms and Buzzwords"
short: "Making Sense of Data Engineering Acronyms and Buzzwords"
guests: [nataliekwong]

image: images/podcast/s05e02-data-engineering-acronyms.jpg

season: 5
episode: 2

ids:
  youtube: t9Z1S3OYnJU
  anchor: Making-Sense-of-Data-Engineering-Acronyms-and-Buzzwords---Natalie-Kwong-e177303

links:
  youtube: https://www.youtube.com/watch?v=t9Z1S3OYnJU
  anchor: https://anchor.fm/datatalksclub/episodes/Making-Sense-of-Data-Engineering-Acronyms-and-Buzzwords---Natalie-Kwong-e177303
  spotify: https://open.spotify.com/episode/1AvtwdcAXGGjdJ7fl0Hsuw
  apple: https://podcasts.apple.com/us/podcast/making-sense-of-data-engineering-acronyms-and/id1541710331?i=1000534990760

transcript:
- line: This week we'll try to make sense of common engineering acronyms and buzzwords
    with the help of our special guest today, Natalie. Natalie works at Airbyte, focusing
    on building user experience and overseeing analytics. Your expert

In [42]:
import frontmatter
import yaml

def parse_data(data_raw):
    data_parsed = []
    for f in data_raw:
        try:
            # Try standard frontmatter parsing first
            post = frontmatter.loads(f.content)
            data = post.to_dict()
            data['filename'] = f.filename
            data_parsed.append(data)
        except yaml.constructor.ConstructorError as e:
            print(f"YAML parsing error in {f.filename}: {e}")
            # Fallback: extract content without frontmatter parsing
            content_parts = f.content.split('---', 2)
            if len(content_parts) >= 3:
                # Has frontmatter, extract just the content
                content = content_parts[2].strip()
            else:
                # No frontmatter, use entire content
                content = f.content
            
            data = {
                'content': content,
                'metadata': {},  # Empty metadata due to parsing error
                'filename': f.filename,
                'parse_error': str(e)
            }
            data_parsed.append(data)
        except Exception as e:
            print(f"Unexpected error parsing {f.filename}: {e}")
            # Create minimal data structure
            data = {
                'content': f.content,
                'metadata': {},
                'filename': f.filename,
                'parse_error': str(e)
            }
            data_parsed.append(data)
    
    return data_parsed

parsed_data = parse_data(github_data)

YAML parsing error in _podcast/_template.md: while constructing a mapping
  in "<unicode string>", line 6, column 8
found unhashable key
  in "<unicode string>", line 6, column 9


In [62]:
parsed_data[5]

{'title': 'Standing out as a Data Scientist',
 'short': 'Standing out as a Data Scientist',
 'guests': ['lukewhipps'],
 'image': 'images/podcast/s01e04-standing-out-as-a-data-scientist.jpg',
 'season': 1,
 'episode': 4,
 'ids': {'youtube': 'Sb4CJlonB3c',
  'anchor': 'Standing-out-as-a-Data-Scientist---Luke-Whipps-envr7e'},
 'links': {'youtube': 'https://www.youtube.com/watch?v=Sb4CJlonB3c',
  'anchor': 'https://anchor.fm/datatalksclub/episodes/Standing-out-as-a-Data-Scientist---Luke-Whipps-envr7e',
  'spotify': 'https://open.spotify.com/episode/2Yxay9HJmd6dvk34MHJ0K2',
  'apple': 'https://podcasts.apple.com/us/podcast/standing-out-as-a-data-scientist-luke-whipps/id1541710331?i=1000502844994'},
 'transcript': [{'line': "Last week, we talked about building data science teams, and recruiting data scientists. Today the topic is slightly different. We'll talk about the recruitment process, but from the candidate’s point of view. We have a special guest today, Luke. You probably know Luke as

In [82]:
"""
Document chunking utilities for splitting large documents into smaller, overlapping pieces.

This module provides functionality to break down documents into chunks using a sliding
window approach, which is useful for processing large texts in smaller, manageable pieces
while maintaining context through overlapping content.
"""

from typing import Any, Dict, Iterable, List


def sliding_window(
        seq: Iterable[Any],
        size: int,
        step: int
    ) -> List[Dict[str, Any]]:
    """
    Create overlapping chunks from a sequence using a sliding window approach.

    Args:
        seq: The input sequence (string or list) to be chunked.
        size (int): The size of each chunk/window.
        step (int): The step size between consecutive windows.

    Returns:
        list: A list of dictionaries, each containing:
            - 'start': The starting position of the chunk in the original sequence
            - 'content': The chunk content

    Raises:
        ValueError: If size or step are not positive integers.

    Example:
        >>> sliding_window("hello world", size=5, step=3)
        [{'start': 0, 'content': 'hello'}, {'start': 3, 'content': 'lo wo'}]
    """
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        batch = seq[i:i+size]
        result.append({'start': i, 'content': batch})
        if i + size > n:
            break

    return result

def prepare_documents_for_chunking(parsed_data):
    """
    Extract text from transcript field and prepare documents for chunking.
    
    Args:
        parsed_data: List of documents with transcript field containing dialogue
        
    Returns:
        List of documents with extracted text in 'content' field
    """
    prepared_docs = []
    
    for doc in parsed_data:
        # Create a copy of the document
        new_doc = doc.copy()
        
        # Extract text from transcript
        transcript_text = ""
        if 'transcript' in doc and doc['transcript']:
            lines = []
            for item in doc['transcript']:
                if 'line' in item:  # Only get actual dialogue lines, skip headers
                    lines.append(item['line'])
            transcript_text = " ".join(lines)
        
        # Add the extracted text as 'content' field
        new_doc['content'] = transcript_text
        prepared_docs.append(new_doc)
    
    return prepared_docs

def chunk_documents(
        documents: Iterable[Dict[str, str]],
        size: int = 30,
        step: int = 15,
        content_field_name: str = 'content'
) -> List[Dict[str, str]]:
    """
    Split a collection of documents into smaller chunks using sliding windows.

    Takes documents and breaks their content into overlapping chunks while preserving
    all other document metadata (filename, etc.) in each chunk.

    Args:
        documents: An iterable of document dictionaries. Each document must have a content field.
        size (int, optional): The maximum size of each chunk. Defaults to 2000.
        step (int, optional): The step size between chunks. Defaults to 1000.
        content_field_name (str, optional): The name of the field containing document content.
                                          Defaults to 'content'.

    Returns:
        list: A list of chunk dictionaries. Each chunk contains:
            - All original document fields except the content field
            - 'start': Starting position of the chunk in original content
            - 'content': The chunk content

    Example:
        >>> documents = [{'content': 'long text...', 'filename': 'doc.txt'}]
        >>> chunks = chunk_documents(documents, size=100, step=50)
        >>> # Or with custom content field:
        >>> documents = [{'text': 'long text...', 'filename': 'doc.txt'}]
        >>> chunks = chunk_documents(documents, content_field_name='text')
    """
    results = []

    for doc in documents:
        doc_copy = doc.copy()
        doc_content = doc_copy.pop(content_field_name)
        chunks = sliding_window(doc_content, size=size, step=step)
        for chunk in chunks:
            chunk.update(doc_copy)
        results.extend(chunks)

    return results

In [83]:
# Prepare the documents
prepared_data = prepare_documents_for_chunking(parsed_data)

# Now chunk the documents using the extracted content
chunks = chunk_documents(prepared_data, size=30, step=15, content_field_name='content')

In [88]:
len(chunks)

516607