Homework 5: LLM Orchestration and Ingestion

Q1. Running Mage

Start it with: ./scripts/start.sh
Now mage is running on http://localhost:6789/

What's the version of mage?
-v0.9.72


Q2. Reading the documents

Now we can ingest the documents. Create a custom code ingestion block

Let's read the documents. We will use the same code we used for parsing FAQ: parse-faq-llm.ipynb

Use the following document_id: 1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E

Which is the document ID of LLM FAQ version 1

Copy the code to the editor How many FAQ documents we processed?

new pipeline -> retrieval augmented generation -> dauntless_cosmos

data preparation->load -> ingest -> add block -> custom code

In [None]:
# Mage ingest block
import io
import requests
import docx

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader

def clean_line(line):
    line = line.strip()
    line = line.strip('\uFEFF')
    return line

def read_faq(file_id):
    url = f'https://docs.google.com/document/d/{file_id}/export?format=docx'
    
    response = requests.get(url)
    response.raise_for_status()
    
    with io.BytesIO(response.content) as f_in:
        doc = docx.Document(f_in)

    questions = []

    question_heading_style = 'heading 2'
    section_heading_style = 'heading 1'
    
    heading_id = ''
    section_title = ''
    question_title = ''
    answer_text_so_far = ''
     
    for p in doc.paragraphs:
        style = p.style.name.lower()
        p_text = clean_line(p.text)
    
        if len(p_text) == 0:
            continue
    
        if style == section_heading_style:
            section_title = p_text
            continue
    
        if style == question_heading_style:
            answer_text_so_far = answer_text_so_far.strip()
            if answer_text_so_far != '' and section_title != '' and question_title != '':
                questions.append({
                    'text': answer_text_so_far,
                    'section': section_title,
                    'question': question_title,
                })
                answer_text_so_far = ''
    
            question_title = p_text
            continue
        
        answer_text_so_far += '\n' + p_text
    
    answer_text_so_far = answer_text_so_far.strip()
    if answer_text_so_far != '' and section_title != '' and question_title != '':
        questions.append({
            'text': answer_text_so_far,
            'section': section_title,
            'question': question_title,
        })

    return questions

@data_loader
def load_data(*args, **kwargs):
    faq_documents = {
    'llm-zoomcamp': '1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E',
}
    documents = []
    for course, file_id in faq_documents.items():
        print(course)
        course_documents = read_faq(file_id)
        documents.append({'course': course, 'documents': course_documents})
        print(len(documents))
    return documents

Mage output:
llm-zoomcamp
1

Q3. Chunking
Data preparation -> transform -> chunking -> add block -> custom code

How many documents (chunks) do we have in the output?
-86

In [None]:
#mage chunking block
#python code for version 1 
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

import hashlib

def generate_document_id(doc):
    combined = f"{doc['course']}-{doc['question']}-{doc['text'][:10]}"
    hash_object = hashlib.md5(combined.encode())
    hash_hex = hash_object.hexdigest()
    document_id = hash_hex[:8]
    return document_id
@transformer
def transform(data, *args, **kwargs):
    documents = []

    for course_dict in data:
        for doc in course_dict['documents']:
            doc['course'] = course_dict['course']
            # previously we used just "id" for document ID
            doc['document_id'] = generate_document_id(doc)
            documents.append(doc)
        print(len(documents))
        print(type(data))
        return documents

Mage output for version1:
86 
#<class 'list'>

Mage output for version2:
86 
#<class 'dict'>

Q4. Export
data preparation -> export - > vector databases -> Elasticsearch

What's the last document id?

Also note the index name.

In [None]:
#mage export document to elastic search
from typing import Dict, List, Tuple, Union

import numpy as np
from elasticsearch import Elasticsearch

from mage_ai.data_preparation.variable_manager import set_global_variable


if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def elasticsearch(
    documents: List[Dict[str, Union[Dict, List[int], np.ndarray, str]]], *args, **kwargs,
):
    """
    Exports document data to an Elasticsearch database.
    """

    connection_string = kwargs.get('connection_string', 'http://localhost:9200')
    #index_name = kwargs.get('index_name', 'documents')
    from datetime import datetime

    index_name_prefix = kwargs.get('index_name', 'documents')
    current_time = datetime.now().strftime("%Y%m%d_%M%S")
    index_name = f"{index_name_prefix}_{current_time}"
    print("index name:", index_name)
    
    set_global_variable('dauntless_cosmos', 'index_name', index_name)

    number_of_shards = kwargs.get('number_of_shards', 1)
    number_of_replicas = kwargs.get('number_of_replicas', 0)
    vector_column_name = kwargs.get('vector_column_name', 'embedding')

    dimensions = kwargs.get('dimensions')
    if dimensions is None and len(documents) > 0:
        document = documents[0]
        dimensions = len(document.get(vector_column_name) or [])

    es_client = Elasticsearch(connection_string)

    print(f'Connecting to Elasticsearch at {connection_string}')
    
    index_settings = {
    "settings": {
        "number_of_shards": number_of_shards,
        "number_of_replicas": number_of_replicas
    },
    "mappings": {
        "properties": {
            "text": {"type": "text"},
            "section": {"type": "text"},
            "question": {"type": "text"},
            "course": {"type": "keyword"},
            "document_id": {"type": "keyword"}
        }
    }
}

    if not es_client.indices.exists(index=index_name):
        es_client.indices.create(index=index_name)
        print('Index created with properties:', index_settings)
        print('Embedding dimensions:', dimensions)

    print(f'Indexing {len(documents)} documents to Elasticsearch index {index_name}')
    for document in documents:
        print(f'Indexing document {document["document_id"]}')
        es_client.index(index=index_name, document=document)
        print(document)


In [None]:
Mage output version 1:
First Document ID: 97872393
Last Document ID: fa136280
Index name : documents_20240820_5919


Mage output version 2:
First Document ID: 97872393 
Last Document ID:fa136280
Index_Name: documents_20240820_5726

Q5. Testing the retrieval
#inference --> retrieval --> iterative retrieval -->elasticsearch

Now let's test the retrieval. Use mage or jupyter notebook to test it.

Let's use the following query: "When is the next cohort?"

What's the ID of the top matching result?



In [None]:
#mage for making the query via retrieval

from typing import Dict, List, Union

import numpy as np
from elasticsearch import Elasticsearch

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader

query='When is the next cohort?'
index_name='documents_20240820_5919'

@data_loader
def search(*args, **kwargs):
    connection_string = kwargs.get('connection_string', 'http://elasticsearch:9200')
    es_client = Elasticsearch(connection_string)
    search_query = {
        "size": 1,
        "query": {
            "bool": {
                "must": {
                    "multi_match": {
                        "query": query,
                        "fields": ["question^3", "text", "section"],
                        "type": "best_fields"
                    }
                },
            }
        }
    }

    response = es_client.search(index=index_name, body=search_query)
    
    result_docs = []
    
    for hit in response['hits']['hits']:
        result_docs.append(hit['_source'])
    
    return result_docs

Mage output version 1: 
{
  "text": "Summer 2025 (via Alexey).",
  "section": "General course-related questions",
  "question": "When will the course be offered next?",
  "course": "llm-zoomcamp",
  "document_id": "bf024675"
}

Q6. Reindexing

Our FAQ document changes to :

The ID of this document is 1T3MdwUvqCL3jrh3d3VCXQ8xE0UqRzI3bfgpfBq3ZWG0.

Let's re-execute the entire pipeline with the updated data.

For the same query "When is the next cohort?". What's the ID of the top matching result?

Mage output version 2: 
{
  "text": "Summer 2026.",
  "section": "General course-related questions",
  "question": "When is the next cohort?",
  "course": "llm-zoomcamp",
  "document_id": "b6fa77f3"
}