# Ingest

In [None]:
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data(*args, **kwargs):
    """
    Template code for loading data from any source.

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Specify your data loading logic here
    import io

    import requests
    import docx

    def clean_line(line):
        line = line.strip()
        line = line.strip('\uFEFF')
        return line

    def read_faq(file_id):
        url = f'https://docs.google.com/document/d/{file_id}/export?format=docx'
        
        response = requests.get(url)
        response.raise_for_status()
        
        with io.BytesIO(response.content) as f_in:
            doc = docx.Document(f_in)

        questions = []

        question_heading_style = 'heading 2'
        section_heading_style = 'heading 1'
        
        heading_id = ''
        section_title = ''
        question_title = ''
        answer_text_so_far = ''
        
        for p in doc.paragraphs:
            style = p.style.name.lower()
            p_text = clean_line(p.text)
        
            if len(p_text) == 0:
                continue
        
            if style == section_heading_style:
                section_title = p_text
                continue
        
            if style == question_heading_style:
                answer_text_so_far = answer_text_so_far.strip()
                if answer_text_so_far != '' and section_title != '' and question_title != '':
                    questions.append({
                        'text': answer_text_so_far,
                        'section': section_title,
                        'question': question_title,
                    })
                    answer_text_so_far = ''
        
                question_title = p_text
                continue
            
            answer_text_so_far += '\n' + p_text
        
        answer_text_so_far = answer_text_so_far.strip()
        if answer_text_so_far != '' and section_title != '' and question_title != '':
            questions.append({
                'text': answer_text_so_far,
                'section': section_title,
                'question': question_title,
            })

        return questions

    faq_documents = {
        'llm-zoomcamp': '1T3MdwUvqCL3jrh3d3VCXQ8xE0UqRzI3bfgpfBq3ZWG0',
    }

    documents = []

    for course, file_id in faq_documents.items():
        print(course)
        course_documents = read_faq(file_id)
        documents.append({'course': course, 'documents': course_documents})
    print("documents", len(documents))
    total_questions = sum(len(course['documents']) for course in documents)
    print(f"Total FAQ entries processed: {total_questions}")
    return documents


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

# Vector database

In [None]:
import json
from typing import Dict, List, Union
import numpy as np
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError
from datetime import datetime
import logging

from mage_ai.data_preparation.variable_manager import set_global_variable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@data_exporter
def elasticsearch(documents, *args, **kwargs):
    connection_string = kwargs.get('connection_string', 'http://elasticsearch:9200')
    index_name_prefix = kwargs.get('index_name', 'documents')
    current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
    index_name = f"{index_name_prefix}_{current_time}"
    print("index_name", index_name)
    set_global_variable("ragic", 'index_name', index_name)

    logger.info(f"index name: {index_name}")
    number_of_shards = kwargs.get('number_of_shards', 1)
    number_of_replicas = kwargs.get('number_of_replicas', 0)
    vector_column_name = kwargs.get('vector_column_name', 'embedding')
    # Ensure dimensions is a positive integer
    #dimensions = kwargs.get('dimensions', 0)
    #if not isinstance(dimensions, int) or dimensions <= 0:
    #    print("dimensions", dimensions)
    #    raise ValueError("'dimensions' must be a positive integer")

    es_client = Elasticsearch([connection_string])
    logger.info(f'Connecting to Elasticsearch at {connection_string}')

    if not es_client.ping():
        raise ConnectionError("Failed to connect to Elasticsearch")

    index_settings = {
        "settings": {
            "number_of_shards": number_of_shards,
            "number_of_replicas": number_of_replicas
        },
        "mappings": {
            "properties": {
                "text": {"type": "text"},
                "section": {"type": "text"},
                "question": {"type": "text"},
                "course": {"type": "keyword"},
                "document_id": {"type": "keyword"}
            }
        }
    }

    if es_client.indices.exists(index=index_name):
        es_client.indices.delete(index=index_name)
        logger.info(f'Index {index_name} deleted')

    es_client.indices.create(index=index_name, body=index_settings)
    logger.info('Index created with properties:')
    logger.info(json.dumps(index_settings, indent=2))
    #logger.info(f'Embedding dimensions: {dimensions}')

    count = len(documents)
    logger.info(f'Indexing {count} documents to Elasticsearch index {index_name}')
    for idx, document in enumerate(documents):
        if idx % 100 == 0:
            logger.info(f'{idx + 1}/{count}')

        #if isinstance(document.get('vector_column_name'), np.ndarray):
        #     document['vector_column_name'] = document['vector_column_name'].tolist()
        # Ensure the embedding is the correct length
        #if len(document.get('vector_column_name', [])) != dimensions:
        #    print(f"Document {idx} has incorrect embedding dimensions. Expected {dimensions}, "
        #         f"got {len(document.get('vector_column_name', []))}. Skipping.")
        #    continue
        
        es_client.index(index=index_name, document=document)
    print("last document", document)

    logger.info("Indexing completed")
    return [[d.get('vector_colum') for d in documents[:10]]]

# Retrieval

from typing import Dict, List, Union

import numpy as np
from elasticsearch import Elasticsearch

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def search(*args, **kwargs) -> List[Dict]:
    connection_string = kwargs.get('connection_string', 'http://localhost:9200')
    index_name = "documents_20240820_203833"
    source = kwargs.get('source', "cosineSimilarity(params.query_vector, 'embedding') + 1.0")
    top_k = kwargs.get('top_k', 5)
    chunk_column = kwargs.get('chunk_column', 'content')

    #if isinstance(query_embedding, np.ndarray):
    #    query_embedding = query_embedding.tolist()

    query = "When is the next cohort?"
    if len(args):
        query = args[0]
    if not query:
        raise ValueError("A query must be provided")

    es_client = Elasticsearch(connection_string)
    try:
        def elastic_search(query):
            search_query = {
                "query": {
                    "bool": {
                        "must": {
                            "multi_match": {
                                "query": query,
                                "fields": ["question^3", "text", "section"],
                                "type": "best_fields"
                            }
                        },
                        "filter": {
                            "term": {
                                "course": "llm-zoomcamp"
                            }
                        }
                    }
                }
            }

            response = es_client.search(index=index_name, body=search_query)
            
            result_docs = []
            
            for hit in response['hits']['hits']:
                result_docs.append(hit['_source'])
            
            return result_docs

        print("Sending search query:", query)


        response = elastic_search(query)
        print("Raw response from Elasticsearch:", response)

        return [hit['_source'][chunk_column] for hit in response['hits']['hits']]
    except Exception as e:
        print(f"Unexpected error: {e}")
        return []


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'