## Homework: LLM Orchestration and Ingestion

### Q1. Running Mage

In [7]:
! docker exec -it eea4e16ee4d9 sh -c "pip list | grep mage-ai"

mage-ai                                  0.9.72

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Q2. Reading the documents

In [None]:
import io

import requests
import docx


if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data(*args, **kwargs):


    def clean_line(line):
        line = line.strip()
        line = line.strip('\uFEFF')
        return line

    def read_faq(file_id):
        url = f'https://docs.google.com/document/d/{file_id}/export?format=docx'
        
        response = requests.get(url)
        response.raise_for_status()
        
        with io.BytesIO(response.content) as f_in:
            doc = docx.Document(f_in)

        questions = []

        question_heading_style = 'heading 2'
        section_heading_style = 'heading 1'
        
        heading_id = ''
        section_title = ''
        question_title = ''
        answer_text_so_far = ''

        for p in doc.paragraphs:
            style = p.style.name.lower()
            p_text = clean_line(p.text)
        
            if len(p_text) == 0:
                continue
        
            if style == section_heading_style:
                section_title = p_text
                continue
        
            if style == question_heading_style:
                answer_text_so_far = answer_text_so_far.strip()
                if answer_text_so_far != '' and section_title != '' and question_title != '':
                    questions.append({
                        'text': answer_text_so_far,
                        'section': section_title,
                        'question': question_title,
                    })
                    answer_text_so_far = ''
        
                question_title = p_text
                continue
            
            answer_text_so_far += '\n' + p_text
        
        answer_text_so_far = answer_text_so_far.strip()
        if answer_text_so_far != '' and section_title != '' and question_title != '':
            questions.append({
                'text': answer_text_so_far,
                'section': section_title,
                'question': question_title,
            })

        return questions

    faq_documents = {
        'llm-zoomcamp': '1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E',
    }

    documents = []

    for course, file_id in faq_documents.items():
        course_documents = read_faq(file_id)
        documents.append({'course': course, 'documents': course_documents})

    print(len(documents))

    return documents


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

1

### Q3. Chunking

In [None]:
import hashlib

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):

    def generate_document_id(doc):
        combined = f"{doc['course']}-{doc['question']}-{doc['text'][:10]}"
        hash_object = hashlib.md5(combined.encode())
        hash_hex = hash_object.hexdigest()
        document_id = hash_hex[:8]
        return document_id

    documents = []

    for doc in data['documents']:
        doc['course'] = data['course']
        # previously we used just "id" for document ID
        doc['document_id'] = generate_document_id(doc)
        documents.append(doc)
        print(doc)

    print(len(documents))

    return documents


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

< class 'dict'>

86

### Q4. Export

In [None]:
from typing import Dict, List, Tuple, Union
from datetime import datetime
from mage_ai.data_preparation.variable_manager import set_global_variable


import numpy as np
from elasticsearch import Elasticsearch

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def elasticsearch(
    documents: List[Dict[str, Union[Dict, List[int], np.ndarray, str]]], *args, **kwargs,
):
    """
    Exports document data to an Elasticsearch database.
    """

    connection_string = kwargs.get('connection_string', 'http://localhost:9200')
    index_name_prefix = kwargs.get('index_name', 'documents')
    current_time = datetime.now().strftime("%Y%m%d_%M%S")
    index_name = f"{index_name_prefix}_{current_time}"
    print("index name:", index_name)
    set_global_variable('awe_inspiring_vortex', 'index_name', index_name)

    number_of_shards = kwargs.get('number_of_shards', 1)
    number_of_replicas = kwargs.get('number_of_replicas', 0)
    vector_column_name = kwargs.get('vector_column_name', 'embedding')

    dimensions = kwargs.get('dimensions')
    if dimensions is None and len(documents) > 0:
        document = documents[0]
        dimensions = len(document.get(vector_column_name) or [])

    es_client = Elasticsearch(connection_string)

    print(f'Connecting to Elasticsearch at {connection_string}')

    index_settings = {
        "settings": {
            "number_of_shards": number_of_shards,
            "number_of_replicas": number_of_replicas
        },
        "mappings": {
            "properties": {
                "text": {"type": "text"},
                "section": {"type": "text"},
                "question": {"type": "text"},
                "course": {"type": "keyword"},
                "document_id": {"type": "keyword"}
            }
        }
    }

    if not es_client.indices.exists(index=index_name):
        es_client.indices.create(index=index_name)
        print('Index created with properties:', index_settings)
        print('Embedding dimensions:', dimensions)

    print(f'Indexing {len(documents)} documents to Elasticsearch index {index_name}')
    for document in documents:
        print(f'Indexing document {document["document_id"]}')

        es_client.index(index=index_name, document=document)

    print(document)

index_name: documents_20240818_4321

Connecting to Elasticsearch at ...

...

'document_id': 'a976d6e7'}

### Q5. Testing the retrieval

In [None]:
from typing import Dict, List, Union

import numpy as np
from elasticsearch import Elasticsearch, exceptions

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

SAMPLE_QUESTION = "When is the next cohort?"


@data_loader
def search(*args, **kwargs) -> List[Dict]:
    connection_string = kwargs.get('connection_string', 'http://localhost:9200')
    index_name = kwargs.get('index_name', 'documents_20240818_4321')
    top_k = kwargs.get('top_k', 5)

    question = ''
    if len(args):
        question = args[0]
    if not question:
        question = SAMPLE_QUESTION

    query = {
        "bool": {
            "must": {
                "multi_match": {
                    "query": question,
                    "fields": ["question^3", "text"],
                    "type": "best_fields"
                }
            },
        }
    }


    es_client = Elasticsearch(connection_string)

    try:
        response = es_client.search(
            index=index_name,
            body={
                "size": top_k,
                "query": query,
            },
        )

        top_match = response['hits']['hits'][0]['_source']['document_id']
        print(top_match)
        return top_match
    
    except exceptions.BadRequestError as e:
        print(f"BadRequestError: {e.info}")
        return []
    except Exception as e:
        print(f"Unexpected error: {e}")
        return []

@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

'bf024675'

### Q6. Reindexing

'b6fa77f3'