In [1]:
import io
from typing import Iterable, Callable
import zipfile
import traceback
from dataclasses import dataclass

import requests


@dataclass
class RawRepositoryFile:
    filename: str
    content: str
class GithubRepositoryDataReader:
    """
    Downloads and parses markdown and code files from a GitHub repository.
    """

    def __init__(self,
                repo_owner: str,
                repo_name: str,
                allowed_extensions: Iterable[str] | None = None,
                filename_filter: Callable[[str], bool] | None = None
        ):
        """
        Initialize the GitHub repository data reader.
        
        Args:
            repo_owner: The owner/organization of the GitHub repository
            repo_name: The name of the GitHub repository
            allowed_extensions: Optional set of file extensions to include
                    (e.g., {"md", "py"}). If not provided, all file types are included
            filename_filter: Optional callable to filter files by their path
        """
        prefix = "https://codeload.github.com"
        self.url = (
            f"{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main"
        )

        if allowed_extensions is not None:
            self.allowed_extensions = {ext.lower() for ext in allowed_extensions}

        if filename_filter is None:
            self.filename_filter = lambda filepath: True
        else:
            self.filename_filter = filename_filter

    def read(self) -> list[RawRepositoryFile]:
        """
        Download and extract files from the GitHub repository.
        
        Returns:
            List of RawRepositoryFile objects for each processed file
            
        Raises:
            Exception: If the repository download fails
        """
        resp = requests.get(self.url)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        zf = zipfile.ZipFile(io.BytesIO(resp.content))
        repository_data = self._extract_files(zf)
        zf.close()

        return repository_data
    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        """
        Extract and process files from the zip archive.
        
        Args:
            zf: ZipFile object containing the repository data

        Returns:
            List of RawRepositoryFile objects for each processed file
        """
        data = []
        for file_info in zf.infolist():
            filepath = self._normalize_filepath(file_info.filename)

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="ignore")
                    if content is not None:
                        content = content.strip()

                    file = RawRepositoryFile(
                        filename=filepath,
                        content=content
                    )
                    data.append(file)

            except Exception as e:
                print(f"Error processing {file_info.filename}: {e}")
                traceback.print_exc()
                continue

        return data

        
    def _should_skip_file(self, filepath: str) -> bool:
        """
        Determine whether a file should be skipped during processing.
        
        Args:
            filepath: The file path to check
            
        Returns:
            True if the file should be skipped, False otherwise
        """
        filepath = filepath.lower()

        # directory
        if filepath.endswith("/"):
            return True

        # hidden file
        filename = filepath.split("/")[-1]
        if filename.startswith("."):
            return True

        if self.allowed_extensions:
            ext = self._get_extension(filepath)
            if ext not in self.allowed_extensions:
                return True

        if not self.filename_filter(filepath):
            return True

        return False
    def _get_extension(self, filepath: str) -> str:
        """
        Extract the file extension from a filepath.
        
        Args:
            filepath: The file path to extract extension from
            
        Returns:
            The file extension (without dot) or empty string if no extension
        """
        filename = filepath.lower().split("/")[-1]
        if "." in filename:
            return filename.rsplit(".", maxsplit=1)[-1]
        else:
            return ""

    def _normalize_filepath(self, filepath: str) -> str:
        """
        Removes the top-level directory from the file path inside the zip archive.
        'repo-main/path/to/file.py' -> 'path/to/file.py'
        
        Args:
            filepath: The original filepath from the zip archive
            
        Returns:
            The normalized filepath with top-level directory removed
        """
        parts = filepath.split("/", maxsplit=1)
        if len(parts) > 1:
            return parts[1]
        else:
            return parts[0]

In [15]:
def read_github_data():
    repo_owner = 'DataTalksClub'
    repo_name = 'datatalksclub.github.io'
    def filename_filter(filename):
        return '_podcast' in filename
    
    allowed_extensions = {"md"}

    reader = GithubRepositoryDataReader(
        repo_owner,
        repo_name,
        allowed_extensions=allowed_extensions,
        filename_filter = filename_filter
       
    )
    return reader.read()   
   

In [16]:
github_data = read_github_data()

In [39]:
print(github_data[3].content)

---
title: "Processes in a Data Science Project"
short: "Processes in a Data Science Project"
guests: [alexeygrigorev]

image: images/podcast/s01e02-processes.jpg

season: 1
episode: 2

ids:
  youtube: SesVTDklFYQ
  anchor: Processes-in-a-Data-Science-Project---Alexey-Grigorev-encdlg

links:
  youtube: https://www.youtube.com/watch?v=SesVTDklFYQ
  anchor: https://anchor.fm/datatalksclub/episodes/Processes-in-a-Data-Science-Project---Alexey-Grigorev-encdlg
  spotify: TODO
  apple: TODO
---


In [21]:
print(len(github_data))

185


In [26]:
import frontmatter

def parse_data(data_raw):
    data_parsed = []
    for f in data_raw:
        try:
            post = frontmatter.loads(f.content)
            data = post.to_dict()
            data['filename'] = f.filename
            data_parsed.append(data)
        except Exception as e:
            print(f" Skipping {f.filename} due to error: {e}")

    return data_parsed


In [27]:
parsed_data = parse_data(github_data)

 Skipping _podcast/_template.md due to error: while constructing a mapping
  in "<unicode string>", line 6, column 8
found unhashable key
  in "<unicode string>", line 6, column 9


In [43]:
print(len(parsed_data))

184


In [41]:
parsed_data[:3]

[{'episode': 8,
  'guests': ['jekaterinakokatjuhha'],
  'ids': {'anchor': 'The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim',
   'youtube': 'FRi0SUtxdMw'},
  'image': 'images/podcast/s12e08-journey-of-data-generalist-from-bioinformatics-to-freelancing.jpg',
  'links': {'anchor': 'https://anchor.fm/datatalksclub/episodes/The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim',
   'apple': 'https://podcasts.apple.com/us/podcast/the-journey-of-a-data-generalist-from/id1541710331?i=1000599125044',
   'spotify': 'https://open.spotify.com/episode/5fB185hGlGYQmdk0kbIsPv?si=YtnsaYNzTc-fl7emZ2IjEA',
   'youtube': 'https://www.youtube.com/watch?v=FRi0SUtxdMw'},
  'season': 12,
  'short': 'The Journey of a Data Generalist: From Bioinformatics to Freelancing',
  'title': 'The Journey of a Data Generalist: From Bioinformatics to Freelancing',
  'transcript': [{'line': "This week we'll talk about being a 

In [55]:
parsed_data[78]['transcript']

[{'line': "Hi, everyone. This week, we'll talk about the work of data scientists and the expectations from them. We have a special guest today, Misra. Misra is a data scientist and content creator. After working as a data scientist for many, many different companies, she decided to create her own platform for teaching data scientists. Maybe you heard about this website – So You Want to Be a Data Scientist. Now we finally meet the person behind this website. Now I think you work as a developer advocate at AssemblyAI, right?",
  'sec': 67,
  'time': '1:07',
  'who': 'Alexey'},
 {'line': "Yes, that's correct. I still work on my platform and my YouTube channel, but I also create content for AssemblyAI.",
  'sec': 103,
  'time': '1:43',
  'who': 'Misra'},
 {'line': 'Okay, yeah. So, welcome.',
  'sec': 112,
  'time': '1:52',
  'who': 'Alexey'},
 {'line': "Thank you. It's great to be here.",
  'sec': 116,
  'time': '1:56',
  'who': 'Misra'},
 {'header': 'Misra’s background'},
 {'line': 'Befor

In [57]:
from typing import Any, Dict, Iterable, List


def sliding_window(
        seq: Iterable[Any],
        size: int,
        step: int
    ) -> List[Dict[str, Any]]:
    """
    Create overlapping chunks from a sequence using a sliding window approach.
    Args:
        seq: The input sequence (string or list) to be chunked.
        size (int): The size of each chunk/window.
        step (int): The step size between consecutive windows.

    Returns:
        list: A list of dictionaries, each containing:
            - 'start': The starting position of the chunk in the original sequence
            - 'content': The chunk content

    Raises:
        ValueError: If size or step are not positive integers.

    Example:
        >>> sliding_window("hello world", size=5, step=3)
        [{'start': 0, 'content': 'hello'}, {'start': 3, 'content': 'lo wo'}]
    """
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        batch = seq[i:i+size]
        result.append({'start': i, 'content': batch})
        if i + size > n:
            break

    return result




In [58]:
def chunk_documents(
        documents: Iterable[Dict[str, str]],
        size: int = 30,
        step: int = 15,
        content_field_name: str = 'content'
) -> List[Dict[str, str]]:
    """
    Split a collection of documents into smaller chunks using sliding windows.

    Takes documents and breaks their content into overlapping chunks while preserving
    all other document metadata (filename, etc.) in each chunk.

    Args:
        documents: An iterable of document dictionaries. Each document must have a content field.
        size (int, optional): The maximum size of each chunk. Defaults to 2000.
        step (int, optional): The step size between chunks. Defaults to 1000.
        content_field_name (str, optional): The name of the field containing document content.
                                          Defaults to 'content'.

    Returns:
        list: A list of chunk dictionaries. Each chunk contains:
            - All original document fields except the content field
            - 'start': Starting position of the chunk in original content
            - 'content': The chunk content
    Example:
        >>> documents = [{'content': 'long text...', 'filename': 'doc.txt'}]
        >>> chunks = chunk_documents(documents, size=100, step=50)
        >>> # Or with custom content field:
        >>> documents = [{'text': 'long text...', 'filename': 'doc.txt'}]
        >>> chunks = chunk_documents(documents, content_field_name='text')
    """
    results = []

    for doc in documents:
        doc_copy = doc.copy()
        doc_content = doc_copy.pop(content_field_name)
        chunks = sliding_window(doc_content, size=size, step=step)
        for chunk in chunks:
            chunk.update(doc_copy)
        results.extend(chunks)

    return results

In [59]:
chunks = chunk_documents(parsed_data)

In [61]:
print(len(chunks))

3395


In [60]:
chunks[3]

{'start': 45,
 'content': '.linkedin.com/in/jekaterina-ko',
 'episode': 8,
 'guests': ['jekaterinakokatjuhha'],
 'ids': {'anchor': 'The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim',
  'youtube': 'FRi0SUtxdMw'},
 'image': 'images/podcast/s12e08-journey-of-data-generalist-from-bioinformatics-to-freelancing.jpg',
 'links': {'anchor': 'https://anchor.fm/datatalksclub/episodes/The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim',
  'apple': 'https://podcasts.apple.com/us/podcast/the-journey-of-a-data-generalist-from/id1541710331?i=1000599125044',
  'spotify': 'https://open.spotify.com/episode/5fB185hGlGYQmdk0kbIsPv?si=YtnsaYNzTc-fl7emZ2IjEA',
  'youtube': 'https://www.youtube.com/watch?v=FRi0SUtxdMw'},
 'season': 12,
 'short': 'The Journey of a Data Generalist: From Bioinformatics to Freelancing',
 'title': 'The Journey of a Data Generalist: From Bioinformatics to Freelancing',
 'transcript':

In [62]:
from minsearch import Index

In [73]:
index = Index(
    text_fields=["content", "filename", "header", "line","who"],
)

index.fit(chunks)

<minsearch.minsearch.Index at 0x72d2e128e000>

In [75]:
search_results = index.search('how do I make money with AI')

In [80]:
#print(search_results)

In [93]:
import json

def search(query):
    """Search for relevant documents."""
    return index.search(
        query=query,
        num_results=3
    )

instructions = """
Answer the QUESTION based on the CONTEXT from the subtitles of a podcast video.

Use only the facts from the CONTEXT when answering the QUESTION.

When answering the question, 
provide the citation in form of the episode and filename pointing where
this is discussed. If the question is discussed in multiple documents,
cite all of them.

Don't use markdown or any formatting in the output.
""".strip()


In [94]:
from openai import OpenAI
openai_client = OpenAI()

def llm(user_prompt, instructions=None, model ='gpt-4o-mini'):
    messages= []
    if instructions :
        messages.append({
            "role":"system",
            "content":instructions
        })
    messages.append({
        "role":"user",
        "content":user_prompt
    })
    response = openai_client.responses.create(
        model=model,
        input=messages
    )
    return response.output_text
        

In [95]:
prompt_template = """

<QUESTION>
{question}
</QUESTION>

<CONTEXT>
{context}
</CONTEXT>
""".strip()

def build_prompt(question, search_results):
    context = json.dumps(search_results)
    return prompt_template.format(
        question=question,
        context=context,
        
    ).strip()

def rag(query):
    search_results = search(query)
    prompt = build_prompt(query, search_results)
    response = llm(prompt, instructions=instructions)
    return response    


In [96]:
answer = rag('how do I make money with AI')
print(answer)

To make money with AI, consider engaging in volunteer projects, which often allow individuals to gain valuable experience and connections that can lead to paid opportunities. These projects can enhance your skills and visibility in the field. Participating in projects with organizations like Omdena or Fruit Punch AI can also provide you with a portfolio of work, which is beneficial for seeking paid roles in AI-related positions.

In addition, leveraging platforms that connect people for collaborative opportunities in AI can help you discover projects that pay. Networking in these communities is key to find lucrative work based on your skills and contributions. 

For more details on opportunities related to AI and making an impact through volunteer work, refer to episode 7, "Make an Impact Through Volunteering Open Source Work," of the Data Talks Club podcast.
