## Использование данных, загруженных в БД. RAG

In [8]:
from pathlib import Path
import yaml

import typing as t


def load_yaml_from_file(path: t.Union[str, Path], loader=yaml.Loader) -> t.Any:
    path = Path(path) if type(path) is str else path
    with path.open(encoding='utf-8') as file:
        return yaml.load(file, loader)


In [57]:
from pydantic import BaseModel
from datetime import datetime


class Context(BaseModel):
    uid: int
    text: str
    title: str
    tags: list[str]
    n_visits: int
    dt: datetime
    href: str
    source: str

## Эмбеддер

In [9]:
from pydantic import BaseModel


class EmbedderSettings(BaseModel):
    batch_size: int = 16
    model_name: str
    model_type: str
    dimension: int
    prefix_query: str
    prefix_document: str


In [10]:
import abc
from typing import List

import more_itertools
import numpy as np
import torch
import torch.nn.functional as F
from tqdm import tqdm
from transformers import AutoModel, AutoTokenizer, XLMRobertaModel, XLMRobertaTokenizer


class IEmbedder(abc.ABC):
    def __init__(self):
        if torch.cuda.is_available():
            self.device = torch.device("cuda")
        else:
            self.device = torch.device("cpu")

    @abc.abstractmethod
    def encode(self, sentences: List[str], doc_type: str) -> np.ndarray:
        """Calculate sentences embedding(s)"""


class Embedder(IEmbedder):
    def __init__(self, settings: EmbedderSettings):
        super().__init__()
        self._settings = settings
        self.batch_size = self._settings.batch_size
        self.model_type = self._settings.model_type
        self.prefix_query = self._settings.prefix_query
        self.prefix_document = self._settings.prefix_document

        if self.model_type == 'e5':
            self.model = XLMRobertaModel.from_pretrained(self._settings.model_name).to(self.device)
            self.tokenizer = XLMRobertaTokenizer.from_pretrained(self._settings.model_name)
        else:
            self.model = AutoModel.from_pretrained(self._settings.model_name).to(self.device)
            self.tokenizer = AutoTokenizer.from_pretrained(self._settings.model_name)

    @staticmethod
    def average_pool(last_hidden_states: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
        last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0)
        return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]

    def encode(self, sentences: List[str], doc_type: str) -> np.ndarray:
        sentences = self.preprocess_sentences(sentences, doc_type)
        embeddings = torch.tensor([]).to(self.device)

        for batch in tqdm(more_itertools.chunked(sentences, self.batch_size)):
            tokenized_batch = self.tokenizer(batch, max_length=512, padding=True,
                                             truncation=True, return_tensors='pt').to(self.device)

            with torch.no_grad():
                outputs = self.model(**tokenized_batch).last_hidden_state
                embed = self.average_pool(outputs, tokenized_batch['attention_mask'])

            torch.cuda.empty_cache()

            for tensor in embed:
                embeddings = torch.cat((embeddings, tensor.unsqueeze(0)), 0)

        return np.array([torch.Tensor.cpu(emb) for emb in F.normalize(embeddings, dim=-1)])

    def preprocess_sentences(self, sentences: List[str], doc_type: str) -> List[str]:
        if doc_type == 'query':
            return [self.prefix_query.format(sentence) for sentence in sentences]
        elif doc_type == 'document':
            return [self.prefix_document.format(sentence) for sentence in sentences]
        return sentences


In [11]:
config = load_yaml_from_file('/Users/d.smakov/PycharmProjects/faplRAG/interface/config.yml')

settings = EmbedderSettings(**config['embedding_model'])
embedder = Embedder(settings)

## Запрос к векторной БД

In [47]:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


SQLALCHEMY_DATABASE_URL = "postgresql://airflow:airflow@localhost:5432/airflow"

engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


  Base = declarative_base()


In [48]:
db = SessionLocal()

In [52]:
from pgvector.sqlalchemy import Vector
from sqlalchemy import Column, Date, Integer, String, Text, ARRAY


class Post(Base):
    __tablename__ = "posts"

    uid = Column(Integer, primary_key=True, index=True)
    title = Column(String, nullable=False)
    dt = Column(Date, nullable=False)
    text_content = Column(Text)
    tags = Column(ARRAY(String, as_tuple=True))
    n_visits = Column(Integer)
    vector = Column(Vector(768))
    author = Column(String)

In [80]:
question = 'Руководство "Манчестер Юнайтед" созрело, чтобы уволить Эрика Тен Хага'

query_vector = embedder.encode([question], doc_type="query")[0].tolist()

1it [00:00, 12.18it/s]


In [81]:
k = config["retrieval"]["top_k_vector"]
similarity_threshold = config["retrieval"]["similarity_threshold"]

In [82]:
# Query the database for the most similar contexts based on cosine similarity
results = (
    db.query(
        Post,
        Post.vector.cosine_distance(query_vector).label("distance"),
    )
    .filter(
        Post.vector.cosine_distance(query_vector) < similarity_threshold
    )
    .order_by("distance")
    .limit(k)
    .all()
)

In [83]:
def build_context_from_vectordb_response(doc: Post) -> Context:
    return Context(uid=doc.uid,
                  text=doc.text_content,
                  title=doc.title,
                  tags=list(doc.tags),
                  n_visits=int(doc.n_visits),
                  dt=datetime.combine(doc.dt, datetime.min.time()),
                  href=f"http://fapl.ru/posts/{doc.uid}/",
                  source="vector")

In [99]:
top_chunks = []

top_chunks.extend([build_context_from_vectordb_response(res.Post) for res in results])
len(top_chunks)

3

## Полнотекстовый поиск

In [13]:
from opensearchpy import OpenSearch

In [16]:
os_client = OpenSearch([{"host": "localhost", "port": 9200}])

In [85]:
top_k = config["retrieval"]["top_k_fulltext"]

# Constructing the query for multiple fields
query = {
    "query": {
        "bool": {
            "should": [
                {"multi_match": {
                    "query": question,
                    "fields": ["title^2", "text_content"],  # Boosting title field
                    "type": "best_fields"
                }},
                {"term": {"tags": question.lower()}}  # Assuming tags are stored in lowercase
            ]
        }
    },
    "size": top_k
}

# Executing the search query
response = os_client.search(index="chunks", body=query)

In [97]:
def build_context_from_elastic_response(doc: dict) -> Context:
    return Context(uid=doc['uid'],
                  text=doc['text_content'],
                  title=doc['title'],
                  tags=list(doc['tags']),
                  n_visits=int(doc['n_visits']),
                  dt=datetime.fromisoformat(doc['dt']).replace(tzinfo=None),
                  href=f"http://fapl.ru/posts/{doc['uid']}/",
                  source="fulltext")

In [100]:
top_chunks.extend(build_context_from_elastic_response(hit['_source']) for hit in response['hits']['hits'])

len(top_chunks)

6

## Смешивание

In [101]:
def deduplicate_and_sort(contexts: List[Context]) -> List[Context]:
    deduplicated = {context.uid: context for context in contexts}.values()
    sorted_contexts = sorted(deduplicated, key=lambda c: c.dt)
    
    return sorted_contexts

result = deduplicate_and_sort(top_chunks)

In [103]:
len(result)

4