## Imports

In [1]:
import os
import json
import logging

# import numpy as np
from tqdm.notebook import tqdm
from sentence_transformers import SentenceTransformer
from langchain_text_splitters import (
    TextSplitter,
    # RecursiveCharacterTextSplitter,
    SentenceTransformersTokenTextSplitter,
)

# import torch
# from collections import Counter
# from ranx import Qrels, Run, evaluate

  from tqdm.autonotebook import tqdm, trange


## Types

In [3]:
from typing import Literal
from pydantic import BaseModel, Field


###########
# SOURCES #
###########
class Source(BaseModel):
    id: str
    url: str
    name: str
    desc: str
    type: Literal["moodle", "file", "web", "tg"] = Field("file")

    def __hash__(self) -> int:
        return self.id.__hash__()


class MoodleSource(Source):
    course_id: str
    course_url: str
    course_name: str
    type: Literal["moodle"] = Field("moodle")


class FileSource(Source):
    type: Literal["file"] = Field("file")


class WebSource(Source):
    type: Literal["web"] = Field("web")


class TelegramSource(Source):
    type: Literal["tg"] = Field("tg")


#########
# UTILS #
#########
class Chunk(BaseModel):
    index: int = Field(ge=0)
    source_id: str
    text: str


##########
# SEARCH #
##########
class SearchQuery(BaseModel):
    text: str


class SearchResult(BaseModel):
    source: Source
    distance: float

    def __hash__(self) -> int:
        return self.source.id.__hash__()

## Paths

In [4]:
from pathlib import Path

# meta datas
DATA_PATH = Path("../data")
META_FILE_PATH = DATA_PATH / "meta.json"

# text data
TEXTS_PATH = Path("../texts")
PREPROCESSED_PATH = Path("../preprocessed")

# all related to validation
VALIDATION_PATH = Path("../validation")

## Preprocess

In [5]:
import regex
import unicodedata


def preprocess(text: str) -> str:
    # remove html tags
    text = regex.sub("<.*?>", "", text)

    # remove non-alphanumeric characters
    # pattern = r'[^\p{L}\p{N}\n\s ' + punctuation + r"]"
    text = regex.sub(r"[^\p{L}\p{N}\n\s\.!?:]", "", text, flags=regex.UNICODE)

    # remove all symbols that are surrounded with 2 spaces (leave a and A as articles)
    text = regex.sub(r"(?<!\S)[^aA](?!\S)", "", text)  # r'(?<!\S)[^a-zA-Z](?!\S)'

    # remove multiple spaces leaving only one
    text = regex.sub(r"[ \n]+", " ", text)

    # normalize unicode characters
    text = unicodedata.normalize("NFKC", text)

    return text


# preprocess("<h1><b>Sequences. Limits of sequences</b></h1>")
# preprocess("1. Find the formula of a general term of a sequence <i>𝑥</i> 1 = 0")
# preprocess("of a sequence\n 𝑥 1  0  𝑥 2  1 ")
preprocess("АНО ВО Университет Иннополис")

'АНО ВО Университет Иннополис'

In [6]:
# module-78346.pdf.txt
with open(TEXTS_PATH / "module-78346.pdf.txt", "r", encoding="utf-8") as file:
    raw_text = file.read()

text = preprocess(raw_text)

if not os.path.exists(PREPROCESSED_PATH):
    os.mkdir(PREPROCESSED_PATH)

with open(PREPROCESSED_PATH / "module-78346.pdf.txt", "w", encoding="utf-8") as file:
    file.write(text)

## Chunk

In [7]:
def load_sources_info(meta_file_path: Path) -> dict[str, Source]:
    with open(meta_file_path, "r", encoding="utf-8") as meta_file:
        meta_data = json.load(meta_file)

    sources_info: dict[str, Source] = {}
    for data in meta_data:
        source: Source = Source.model_validate_json(json.dumps(data), strict=True)
        sources_info[source.id] = source

    return sources_info

In [8]:
def load_chunks_info(meta_file_path: Path, texts_path: Path, text_splitter: TextSplitter) -> dict[int, Chunk]:
    # log missing files
    logging.basicConfig(
        filename="missing.log",
        filemode="w",
        level=logging.INFO,
        encoding="utf-8",
    )

    with open(meta_file_path, "r", encoding="utf-8") as meta_file:
        meta_data = json.load(meta_file)

    # get current available sources list
    sources: list[Source] = []
    for data in meta_data:
        source: Source = Source.model_validate_json(json.dumps(data), strict=True)
        sources.append(source)

    index = 0
    chunks_info: dict[int, Chunk] = {}
    for source in tqdm(sources, total=len(sources), desc="Split sources on chunks", unit="source"):
        source_text_path = texts_path / (source.name + ".txt")

        # save not found files into logs
        if not os.path.exists(source_text_path):
            logging.info(source.id)
            continue

        # otherwise get their content
        with open(source_text_path, "r", encoding="utf-8") as text_file:
            text = text_file.read()

        # update info dict with current source's chunks
        for chunk_text in text_splitter.split_text(text):
            chunk = Chunk(index=index, source_id=source.id, text=chunk_text)
            chunks_info[index] = chunk
            index += 1

    return chunks_info

In [9]:
def get_source_by_chunk(chunk_index: int, chunks_info: dict[int, Chunk], sources_info: dict[str, Source]) -> Source:
    if not chunks_info.get(chunk_index):
        raise ValueError(f"Chunk {chunk_index} not found")

    chunk: Chunk = chunks_info[chunk_index]
    if not sources_info.get(chunk.source_id):
        raise ValueError(f"Source {chunk.source_id} not found")

    return sources_info[chunk.source_id]

## Utils

In [10]:
def print_text_file(texts_path: Path, source_name: str):
    source_path = texts_path / (source_name + ".txt")
    if not os.path.exists(source_path):
        print(f"File {source_path} not found")
        return

    with open(source_path, "r", encoding="utf-8") as file:
        print(file.read())

## Code

In [11]:
sources_info = load_sources_info(META_FILE_PATH)  # type: ignore

if not os.path.exists(PREPROCESSED_PATH):
    os.mkdir(PREPROCESSED_PATH)

for source in sources_info.values():
    source_filename = source.name + ".txt"
    source_path = TEXTS_PATH / source_filename

    if os.path.exists(source_path):
        with open(source_path, "r", encoding="utf-8") as file:
            text = file.read()

        with open(PREPROCESSED_PATH / source_filename, "w", encoding="utf-8") as file:
            file.write(preprocess(text))

In [12]:
sources_info = load_sources_info(META_FILE_PATH)  # type: ignore

MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
model = SentenceTransformer(MODEL_NAME)

In [13]:
# split on chunks
text_splitter = SentenceTransformersTokenTextSplitter(chunk_overlap=25, model_name=MODEL_NAME)
chunks_info = load_chunks_info(META_FILE_PATH, PREPROCESSED_PATH, text_splitter)

Split sources on chunks:   0%|          | 0/949 [00:00<?, ?source/s]

In [17]:
from retriv import SparseRetriever, DenseRetriever

In [14]:
os.environ["PYTHONIOENCODING"] = "utf-8"
os.environ["PYTHONLEGACYWINDOWSSTDIO"] = "utf-8"
# os.environ["PYTHONUTF8"]

## Sparse Retriever

In [15]:
# form index file of sources
with open(DATA_PATH / "index-sources.jsonl", "w", encoding="utf-8") as index_file:
    for source in list(sources_info.values())[:10]:
        with open(PREPROCESSED_PATH / (source.name + ".txt"), "r", encoding="utf-8") as text_file:
            text = text_file.read()

        info = {"id": source.id, "text": text}
        index_file.write(json.dumps(info, ensure_ascii=False) + "\n")

In [37]:
sr = SparseRetriever(
    index_name="InNoHassle-Search",
    model="bm25",
    min_df=1,
    tokenizer="whitespace",
    stemmer=None,  # "snowball",
    stopwords=["english", "russian"],
    do_lowercasing=False,
    do_ampersand_normalization=True,
    do_special_chars_normalization=True,
    do_acronyms_normalization=True,
    do_punctuation_removal=True,
)

In [38]:
sr = sr.index_file(
    path=DATA_PATH / "index-chunks.jsonl",
    callback=lambda chunk: {  # Callback defaults to None.
        "id": chunk["index"],
        "text": chunk["text"],
        "source_id": chunk["source_id"],
    },
)


Building TDF matrix:   0%|          | 0/10000 [00:00<?, ?it/s]

In [20]:
sr.search(
    query="перечень документов",  # What to search for
    return_docs=True,  # Default value, return the text of the documents
    cutoff=10,  # Default value, number of results to return
)

[{'id': 'https://hotel.innopolis.university/upload/docs-hotel/437999%20Правила.pdf',
  'text': 'УТВЕРЖДЕНЫ приказом АНО ВО Университет Иннополис от 09.08.2023 Директор К.В. Семенихин Правила размещения проживания жилом комплексе АНО ВО Университет Иннополис г. Иннополис 2023 1. Общие положения 1.1. Настоящие Правила размещения проживания жилом комплексе АНО ВО Университет Иннополис далее Правила определяют порядок размещения проживания жилом комплексе АНО ВО Университет Иннополис расположенном по адресу: Республика Татарстан Верхнеуслонский муниципальный район г. Иннополис ул. Университетская д. корпуса далее Комплекс обязательны для исполнения всеми категориями проживающих далее по тексту Проживающие. 1.2. Размещение Комплексе производится на основании договора предоставлении услуг по размещению или на основании договора присоединения предоставлении услуг по размещению путем подписания согласия на присоединение регистрационной карты далее Договор. 1.3. целью реализации уставной деятел

## Dense Retreiver

In [22]:
with open(DATA_PATH / "index-chunks.jsonl", "w", encoding="utf-8") as index_file:
    for chunk in list(chunks_info.values())[:10000]:
        index_file.write(json.dumps(chunk.model_dump(), ensure_ascii=False) + "\n")

In [29]:
dr = DenseRetriever(
    index_name="new-index",
    model="sentence-transformers/all-MiniLM-L6-v2",
    normalize=False,
    max_length=256,
    use_ann=False,
)

In [30]:
import retriv.dense_retriever.encoder


def count_lines(path: str):
    """Counts the number of lines in a file."""
    return sum(1 for _ in open(path, encoding="utf-8"))


retriv.dense_retriever.encoder.count_lines = count_lines

In [35]:
dr = dr.index_file(
    path=DATA_PATH / "index-chunks.jsonl",
    callback=lambda chunk: {  # Callback defaults to None.
        "id": chunk["index"],
        "text": chunk["text"],
        # "source_id": chunk["source_id"],
    },
    use_gpu=True,
)

  attn_output = torch.nn.functional.scaled_dot_product_attention(
Embedding documents: 100%|██████████| 10000/10000 [00:46<00:00, 213.75it/s]


Loading embeddings...


In [36]:
dr.search(
    query="документы для размещения в Жилом комплексе",  # What to search for
    return_docs=True,  # Default value, return the text of the documents
    cutoff=100,  # Default value, number of results to return
)

[{'id': 274,
  'text': 'праве доступа комнату для велохранения!',
  'score': 7.002532},
 {'id': 211,
  'text': 'жилого помещения мебели оборудования жилом помещении правилами проживания инструкциеи по соблюдению мер пожарнои безопасности жилом комплексе ано во университет иннополис стоимостью услуг размещения ознакомлена согласена применением мер ответственности за курение жилом комплексе иили местах не обозначе',
  'score': 4.1740074},
 {'id': 89,
  'text': 'стоимости проживания комплексе согласно деиствующему приказу стоимости услуг размещения комплексе.',
  'score': 4.1336346},
 {'id': 233,
  'text': '##ассмотрения заявки принимает решение. 3. 4 после получения одобрения со стороны руководителя отдела организации размещении администратор жилого комплекса начинает процедуру подготовки свободного места номера. 3. 5 если проживающии сообщает номере которыи он предпочитает переселиться после получения сог',
  'score': 3.797642},
 {'id': 207,
  'text': '##щения за одни сутки согласно выб

In [None]:
# import json

# entries = []
# with open("../data/meta.json", 'r', encoding='utf-8') as json_file:
#     entries = json.load(json_file)
#     # for line in json_file:
#     #     entries.append(json.loads(line))
#     #     break

# with open('../data/meta.jsonl', 'w', encoding='utf-8') as outfile:
#     for entry in entries[:1]:
#         json.dump(entry, outfile, ensure_ascii=False)
#         outfile.write('\n')