In [None]:
import logging as l
from io import BytesIO
from zipfile import ZipFile

import requests
from elasticsearch import Elasticsearch
from fastprogress import progress_bar

from tg_rag.config import Config
from tg_rag.utils import init_logger


In [None]:
log = l.getLogger(__name__)

In [None]:
def download_book(url):
    log.info(f"Downloading the book from {url}")
    response = requests.get(url)
    myzip = ZipFile(BytesIO(response.content))
    file = myzip.namelist()[0]
    text = myzip.open(file).read().decode("windows-1251")
    return text

In [None]:
def count_words(text):
    return len(text.split())

def is_dialog(text):
    return text.startswith("-") or text.startswith("–") or text.startswith("—")

def merge_dialogs(paragraphs, max_words = 500):
    merged_chunk = []
    result = []
    for p in paragraphs:
        if is_dialog(p) and count_words(" ".join(merged_chunk)) < max_words: 
            merged_chunk.append(p)
            continue
        if merged_chunk:
            result.append(" ".join(merged_chunk))
            merged_chunk = []
            
        if is_dialog(p): merged_chunk.append(p)
        else: result.append(p)   
    return result

def merge_short_paragraphs(paragraphs, min_words = 50, max_words = 500):
    merged_chunk = []
    result = []
    for p in paragraphs:
        if count_words(" ".join(merged_chunk)) >= max_words:
            result.append(" ".join(merged_chunk))
            merged_chunk = []
    
        if count_words(p) < min_words: merged_chunk.append(p)
        else:
            result.append(" ".join(merged_chunk+[p]))
            merged_chunk = []

    return result

In [None]:
def parse_book(book):
    book = book[book.find("ЧАСТЬ ПЕРВАЯ"):]
    paragraphs = [t for t in book.split("\r\n") if t!='']
    paragraphs = merge_dialogs(paragraphs, 100)     
    paragraphs = merge_short_paragraphs(paragraphs, 80, 150)

In [None]:
cfg = Config()
b = download_book(cfg.book_url)
paragraphs = parse_book(b)

In [None]:
def create_index(es_client, index_name):
    log.info(f"Creating index {index_name}")
    mappings = {
        "properties": {
            "text": {
                "type": "text",
                "term_vector": "yes"
            },
        }
    }
    es_client.indices.create(index=index_name, mappings=mappings)
    log.info(f"Index {index_name} created.")


def index_paragraphs(es, index_name, paragraphs):
    """Index each paragraph into the specified Elasticsearch index."""
    log.info("Indexing paragraphs...")
    for i, paragraph in enumerate(progress_bar(paragraphs)):
        doc = {'text': paragraph}
        es.index(index=index_name, id=i, document=doc)
    es.indices.refresh(index=index_name)
    log.info("Indexing completed")


def main():
    init_logger(log.name)
    init_logger(name="elastic_transport.transport", level=log.ERROR)
    cfg = Config()

    paragraphs = download_book(cfg.book_url)

    log.info(f"Connecting to {cfg.es_creds}@{cfg.es_url}")
    es = Elasticsearch([cfg.es_url], basic_auth=cfg.es_creds, verify_certs=False)

    create_index(es, "my_index")
    index_paragraphs(es, "my_index", paragraphs)


if __name__ == "__main__":
    main()
