Our FAQ documents change with time: students add more records and edit existing ones. We need to keep our index in sync.

There are two ways of doing it:

Incremental: you only update records that got changed, created or deleted
Full update: you recreate the entire index from scratch
In this homework, we'll look at full update. We will run our indexing pipeline daily and re-create the index from scracth each time we run.

For that, we created two FAQ documents for LLM Zoomcamp

* [version 1](https://docs.google.com/document/d/1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E/edit)
* [version 2](https://docs.google.com/document/d/1T3MdwUvqCL3jrh3d3VCXQ8xE0UqRzI3bfgpfBq3ZWG0/edit)

First, we will run our ingestion pipeline with version 1 and then with version 2.

### Q1. Running Mage
Clone the same repo we used in the module and run mage:

```git clone https://github.com/mage-ai/rag-project```
Add the following libraries to the requirements document:
```python
python-docx
elasticsearch
```
Make sure you use the latest version of mage:
```bash
docker pull mageai/mageai:llm
```
Start it:
```
./scripts/start.sh
```
Now mage is running on [http://localhost:6789/](http://localhost:6789/)

What's the version of mage?

### Creating a RAG pipeline
Create a RAG pipeline

### Q2. Reading the documents
Now we can ingest the documents. Create a custom code ingestion block

Let's read the documents. We will use the same code we used for parsing FAQ: [parse-faq-llm.ipynb](https://github.com/DataTalksClub/llm-zoomcamp/blob/main/cohorts/2024/05-orchestration/parse-faq-llm.ipynb)

Use the following document_id: ```1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E```

Which is the document ID of [LLM FAQ version 1](https://docs.google.com/document/d/1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E/edit)

Copy the code to the editor. How many FAQ documents we processed?

* 1
* 2
* 3
* 4

In [None]:
# Custom code for Mage Ingest
import io
import requests
import docx

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data(*args, **kwargs):
    def clean_line(line):
        line = line.strip()
        line = line.strip('\uFEFF')
        return line

    def read_faq(file_id):
        url = f'https://docs.google.com/document/d/{file_id}/export?format=docx'
        
        response = requests.get(url)
        response.raise_for_status()
        
        with io.BytesIO(response.content) as f_in:
            doc = docx.Document(f_in)

        questions = []

        question_heading_style = 'heading 2'
        section_heading_style = 'heading 1'
        
        heading_id = ''
        section_title = ''
        question_title = ''
        answer_text_so_far = ''
        
        for p in doc.paragraphs:
            style = p.style.name.lower()
            p_text = clean_line(p.text)
        
            if len(p_text) == 0:
                continue
        
            if style == section_heading_style:
                section_title = p_text
                continue
        
            if style == question_heading_style:
                answer_text_so_far = answer_text_so_far.strip()
                if answer_text_so_far != '' and section_title != '' and question_title != '':
                    questions.append({
                        'text': answer_text_so_far,
                        'section': section_title,
                        'question': question_title,
                    })
                    answer_text_so_far = ''
        
                question_title = p_text
                continue
            
            answer_text_so_far += '\n' + p_text
        
        answer_text_so_far = answer_text_so_far.strip()
        if answer_text_so_far != '' and section_title != '' and question_title != '':
            questions.append({
                'text': answer_text_so_far,
                'section': section_title,
                'question': question_title,
            })

        return questions

    faq_documents = {
        #'llm-zoomcamp': '1m2KexowAXTmexfC5rVTCSnaShvdUQ8Ag2IEiwBDHxN0',
        # Version 1
        #'llm-zoomcamp': '1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E',
        # Version 2
        'llm-zoomcamp': '1T3MdwUvqCL3jrh3d3VCXQ8xE0UqRzI3bfgpfBq3ZWG0',
    }

    documents = []

    for course, file_id in faq_documents.items():
        print(course)
        course_documents = read_faq(file_id)
        documents.append({'course': course, 'documents': course_documents})
    
    return documents
    
@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

### Q3. Chunking
We don't really need to do any chuncking because our documents already have well-specified boundaries. So we just need to return the documents without any changes.

So let's go to the transformation part and add a custom code chunking block:
```python
documents = []

for doc in data['documents']:
    doc['course'] = data['course']
    # previously we used just "id" for document ID
    doc['document_id'] = generate_document_id(doc)
    documents.append(doc)

print(len(documents))

return documents
```
Where data is the input parameter to the transformer.

And the generate_document_id is defined in the same way as in module 4:
```python
import hashlib

def generate_document_id(doc):
    combined = f"{doc['course']}-{doc['question']}-{doc['text'][:10]}"
    hash_object = hashlib.md5(combined.encode())
    hash_hex = hash_object.hexdigest()
    document_id = hash_hex[:8]
    return document_id
```
Note: if instead of a single dictionary you get a list, add a for loop:
```
for course_dict in data:
    ...
```
You can check the type of data with this code:

```print(type(data))```

How many documents (chunks) do we have in the output?

* 66
* 76
* 86
* 96

In [None]:
# Custom code for Mage Transform --> Chunking
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Specify your transformation logic here
    import hashlib
    print(type(data))
    
    def generate_document_id(doc):
        combined = f"{doc['course']}-{doc['question']}-{doc['text'][:10]}"
        hash_object = hashlib.md5(combined.encode())
        hash_hex = hash_object.hexdigest()
        document_id = hash_hex[:8]
        return document_id
        
    documents = []

    for doc in data['documents']:
        doc['course'] = data['course']
        # previously we used just "id" for document ID
        doc['document_id'] = generate_document_id(doc)
        documents.append(doc)

    print(len(documents))

    return documents

    #return data


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

### Tokenization and embeddings
We don't need any tokenization, so we skip it.

Because currently it's required in mage, we can create a dummy code block:

Create a custom code block
Don't change it
Because we will use text search, we also don't need embeddings, so skip it too.

If you want to use sentence transformers - the ones from module 3 - you don't need tokenization, but need embeddings (you don't need it for this homework)

### Q4. Export
Now we're ready to index the data with elasticsearch. For that, we use the Export part of the pipeline

* Go to the Export part
* Select vector databases -> Elasticsearch
* Open the code for editing
Because we won't use vector search, but usual text search, we will need to adjust the code.

First, let's change the line where we read the index name:

```index_name = kwargs.get('index_name', 'documents')```

To ```index_name_prefix``` - we will parametrize it with the day and time we run the pipeline
```python
from datetime import datetime

index_name_prefix = kwargs.get('index_name', 'documents')
current_time = datetime.now().strftime("%Y%m%d_%M%S")
index_name = f"{index_name_prefix}_{current_time}"
print("index name:", index_name)
```
We will need to save the name in a global variable, so it can be accessible in other code blocks
```python
from mage_ai.data_preparation.variable_manager import set_global_variable

set_global_variable('YOUR_PIPELINE_NAME', 'index_name', index_name)
```
Where your pipeline name is the name of the pipeline, e.g. transcendent_nexus (replace the space with underscore _)

Replace index settings with the settings we used previously:
```python
index_settings = {
    "settings": {
        "number_of_shards": number_of_shards,
        "number_of_replicas": number_of_replicas
    },
    "mappings": {
        "properties": {
            "text": {"type": "text"},
            "section": {"type": "text"},
            "question": {"type": "text"},
            "course": {"type": "keyword"},
            "document_id": {"type": "keyword"}
        }
    }
}
```
Remove the embeddings line:
```python
if isinstance(document[vector_column_name], np.ndarray):
    document[vector_column_name] = document[vector_column_name].tolist()
```
At the end (outside of the indexing for loop), print the last document:
```python
print(document)
```
Now execute the block.

What's the last document id?

Also note the index name.

In [None]:
# Custom code for Mage Export --> Vector Database --> Elasticsearch
from typing import Dict, List, Tuple, Union

import numpy as np
from elasticsearch import Elasticsearch

from datetime import datetime
from mage_ai.data_preparation.variable_manager import set_global_variable

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def elasticsearch(
    documents: List[Dict[str, Union[Dict, List[int], np.ndarray, str]]], *args, **kwargs,
):
    """
    Exports document data to an Elasticsearch database.
    """

    connection_string = kwargs.get('connection_string', 'http://localhost:9200')
    #index_name = kwargs.get('index_name', 'documents')

    index_name_prefix = kwargs.get('index_name', 'documents')
    current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
    index_name = f"{index_name_prefix}_{current_time}"
    print("index name:", index_name)

    set_global_variable('Cosmic_aurora', 'index_name', index_name)

    number_of_shards = kwargs.get('number_of_shards', 1)
    number_of_replicas = kwargs.get('number_of_replicas', 0)
    vector_column_name = kwargs.get('vector_column_name', 'embedding')

    dimensions = kwargs.get('dimensions')
    if dimensions is None and len(documents) > 0:
        document = documents[0]
        dimensions = len(document.get(vector_column_name) or [])

    es_client = Elasticsearch(connection_string)

    print(f'Connecting to Elasticsearch at {connection_string}')

    #index_settings = dict(
    #    settings=dict(
    #        number_of_shards=number_of_shards,
    #        number_of_replicas=number_of_replicas,
    #    ),
    #    mappings=dict(
    #        properties=dict(
    #            chunk=dict(type='text'),
    #            document_id=dict(type='text'),
    #            embedding=dict(type='dense_vector', dims=dimensions),
    #        ),
    #    ),
    #)

    index_settings = {
    "settings": {
        "number_of_shards": number_of_shards,
        "number_of_replicas": number_of_replicas
        },
    "mappings": {
        "properties": {
            "text": {"type": "text"},
            "section": {"type": "text"},
            "question": {"type": "text"},
            "course": {"type": "keyword"},
            "document_id": {"type": "keyword"}
            }
        }
    }

    if not es_client.indices.exists(index=index_name):
        es_client.indices.create(index=index_name)
        print('Index created with properties:', index_settings)
        print('Embedding dimensions:', dimensions)

    print(f'Indexing {len(documents)} documents to Elasticsearch index {index_name}')
    for document in documents:
        # print(f'Indexing document {document["document_id"]}')

        # if isinstance(document[vector_column_name], np.ndarray):
        #     document[vector_column_name] = document[vector_column_name].tolist()

        es_client.index(index=index_name, document=document)
    
    print(document)

### Q1 - Q4 all executed in Mage 

### Q5 - Q6 executed in Jupyter Notebook as unable to find text search query function in Mage

### Q5. Testing the retrieval
Now let's test the retrieval. Use mage or jupyter notebook to test it.

Let's use the following query: "When is the next cohort?"

What's the ID of the top matching result?

In [1]:
from elasticsearch import Elasticsearch

In [2]:
es_client = Elasticsearch('http://localhost:9200') 

In [3]:
def elastic_search(query, index_name):
    search_query = {
        "size": 1,
        "query": {
            "bool": {
                "must": {
                    "multi_match": {
                        "query": query,
                        "fields": ["question^3", "text", "section"],
                        "type": "best_fields"
                    }
                },
                # "filter": {
                #     "term": {
                #         "course": course
                #     }
                # }
            }
        }
    }

    response = es_client.search(index=index_name, body=search_query)
    
    result_docs = []
    
    for hit in response['hits']['hits']:
        result_docs.append(hit['_source'])
    
    return result_docs

* Index name for version 1 document: ```documents_20240812_3707```
* Index name for version 2 document: ```documents_20240812_201738```

In [4]:
results = elastic_search(
    query="When is the next cohort?",
    index_name="documents_20240812_3707"
)
results

[{'text': 'Summer 2025 (via Alexey).',
  'section': 'General course-related questions',
  'question': 'When will the course be offered next?',
  'course': 'llm-zoomcamp',
  'document_id': 'bf024675'}]

### Q6. Reindexing
Our FAQ document changes: every day course participants add new records or improve existing ones.

Imagine some time passed and the document changed. For that we have another version of the FAQ document: version 2.

The ID of this document is ```1T3MdwUvqCL3jrh3d3VCXQ8xE0UqRzI3bfgpfBq3ZWG0.```

Let's re-execute the entire pipeline with the updated data.

For the same query "When is the next cohort?". What's the ID of the top matching result?

In [5]:
results = elastic_search(
    query="When is the next cohort?",
    index_name="documents_20240812_201738"
)
results

[{'text': 'Summer 2025 (via Alexey).',
  'section': 'General course-related questions',
  'question': 'When will the course be offered next?',
  'course': 'llm-zoomcamp',
  'document_id': 'bf024675'}]