<a href="https://colab.research.google.com/github/marimsw/RAG_sistem_Gl3_BM25/blob/main/Gl_3_1_RAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

TF-IDF: фундаментальная модель     
Подход Term Frequency – Inverse Document Frequency остаётся краеугольным
камнем информационного поиска. Метод основан на простой идее: важность
термина в документе пропорциональна частоте его встречаемости в этом
документе и обратно пропорциональна частоте встречаемости во всей коллекции.

In [5]:
import numpy as np
import math
from collections import Counter, defaultdict
from typing import List, Dict, Set
class TFIDFRetriever:
  def __init__(self):
      """Инициализация TF-IDF ретривера"""
      self.vocabulary: Set[str] = set()
      self.documents: List[List[str]] = []
      self.document_frequencies: Dict[str, int] = defaultdict(int)
      self.tfidf_matrix: np.ndarray = None
      self.idf_values: Dict[str, float] = {}
  def _tokenize(self, text: str) -> List[str]:
      """Простая токенизация текста"""
      # В реальных системах используйте более продвинутые методы
      return text.lower().split()
  def _calculate_tf(self, term_count: int, total_terms: int) -> float:
      """Вычисление Term Frequency"""
      if total_terms == 0:
        return 0.0
      return term_count / total_terms
  def _calculate_idf(self, term: str) -> float:
      """Вычисление Inverse Document Frequency"""
      if term not in self.document_frequencies:
        return 0.0
      # Добавляем 1 к знаменателю для избежания деления на ноль
      idf = math.log(len(self.documents) / (self.document_frequencies[term] + 1))
      return idf
  def fit(self, documents: List[str]):
      """Обучение модели на коллекции документов"""
      print("Токенизация документов...")
      self.documents = [self._tokenize(doc) for doc in documents]
      # Построение словаря и подсчёт частот документов
      print("Построение словаря...")
      for doc_tokens in self.documents:
          unique_terms = set(doc_tokens)
          self.vocabulary.update(unique_terms)
          for term in unique_terms:
              self.document_frequencies[term] += 1
              print(f"Размер словаря: {len(self.vocabulary)}")
            # Вычисление IDF для всех терминов
          print("Вычисление IDF...")
          for term in self.vocabulary:
            self.idf_values[term] = self._calculate_idf(term)
          # Построение TF-IDF матрицы
          print("Построение TF-IDF матрицы...")
          self._build_tfidf_matrix()
  def _build_tfidf_matrix(self):
      """Построение матрицы TF-IDF для всех документов"""
      vocab_list = list(self.vocabulary)
      self.vocab_to_index = {term: i for i, term in enumerate(vocab_list)}
      # Инициализация матрицы
      num_docs = len(self.documents)
      vocab_size = len(self.vocabulary)
      self.tfidf_matrix = np.zeros((num_docs, vocab_size))
      for doc_idx, doc_tokens in enumerate(self.documents):
       # Подсчёт частот терминов в документе
        term_counts = Counter(doc_tokens)
        total_terms = len(doc_tokens)
        for term, count in term_counts.items():
         if term in self.vocab_to_index:
            term_idx = self.vocab_to_index[term]
            tf = self._calculate_tf(count, total_terms)
            idf = self.idf_values[term]
            self.tfidf_matrix[doc_idx, term_idx] = tf * idf
  def _vectorize_query(self, query: str) -> np.ndarray:
      """Преобразование запроса в TF-IDF вектор"""
      query_tokens = self._tokenize(query)
      query_vector = np.zeros(len(self.vocabulary))
      if not query_tokens:
        return query_vector
      term_counts = Counter(query_tokens)
      total_terms = len(query_tokens)
      for term, count in term_counts.items():
        if term in self.vocab_to_index:
          term_idx = self.vocab_to_index[term]
          tf = self._calculate_tf(count, total_terms)
          idf = self.idf_values.get(term, 0.0)
          query_vector[term_idx] = tf * idf
      return query_vector
  def search(self, query: str, top_k: int = 5) -> List[tuple]:
    """Поиск наиболее релевантных документов"""
    if self.tfidf_matrix is None:
      raise ValueError("Модель не обучена. Вызовите fit() перед поиском.")


    # Векторизация запроса
    query_vector = self._vectorize_query(query)
    # Вычисление косинусного сходства
    # Нормализация векторов
    query_norm = np.linalg.norm(query_vector)
    doc_norms = np.linalg.norm(self.tfidf_matrix, axis=1)
    if query_norm == 0:
      return []
    # Косинусное сходство
    similarities = np.dot(self.tfidf_matrix, query_vector) / (doc_norms * query_norm +
    1e-8)
    # Получение топ-K результатов
    top_indices = np.argsort(similarities)[-top_k:][::-1]
    results = []
    for idx in top_indices:
      if similarities[idx] > 0: # Только положительные сходства
        results.append((idx, similarities[idx]))
    return results
  def explain_scoring(self, query: str, doc_idx: int) -> Dict:
    """Детальное объяснение подсчёта релевантности"""
    query_tokens = self._tokenize(query)
    doc_tokens = self.documents[doc_idx]
    explanation = {
    'query_tokens': query_tokens,
    'document_tokens': doc_tokens,
    'term_scores': {}
    }
    doc_term_counts = Counter(doc_tokens)
    query_term_counts= Counter(query_tokens)
    total_doc_terms = len(doc_tokens)
    total_query_terms = len(query_tokens)


    for term in set(query_tokens):
      if term in self.vocab_to_index:
          # TF для документа и запроса
          doc_tf = self._calculate_tf(doc_term_counts[term], total_doc_terms)
          query_tf = self._calculate_tf(query_term_counts[term], total_query_terms)
          # IDF
          idf = self.idf_values[term]
          # TF-IDF для документа и запроса
          doc_tfidf = doc_tf * idf
          query_tfidf = query_tf * idf
          explanation['term_scores'][term] = {

          'document_tf': doc_tf,
          'query_tf': query_tf,
          'idf': idf,
          'document_tfidf': doc_tfidf,
          'query_tfidf': query_tfidf,
          'contribution': doc_tfidf * query_tfidf
          }
    return explanation

Продвинутая реализация ВМ25

BM25: вероятностная эволюция     
Алгоритм BM25 представляет собой вероятностную модель поиска, превосходящую TF-IDF за счёт учёта насыщения частоты терминов и нормализации
длины документов. Алгоритм основан на модели вероятностного ранжирования Робертсона–Спарка–Джонса.

In [5]:
import numpy as np
import math
from collections import Counter, defaultdict
from typing import List, Dict, Tuple
class BM25Retriever:
  def __init__(self, k1: float = 1.2, b: float = 0.75):
      """
      Инициализация BM25-ретривера
      Args:
        k1: параметр насыщения частоты терминов (типично 1.2–2.0)
        b: параметр нормализации длины документа (типично 0.75)
      """
      self.k1 = k1
      self.b = b
      self.documents: List[List[str]] = []
      self.document_lengths: List[int] = []
      self.average_document_length: float = 0.0
      self.vocabulary: Dict[str, int] = {}
      self.document_frequencies: Dict[str, int] = defaultdict(int)
      self.idf_cache: Dict[str, float] = {}
      self.term_document_matrix: Dict[str, Dict[int, int]] = defaultdict(dict)
  def _tokenize(self, text: str) -> List[str]:
      """Токенизация с основной предобработкой"""
      # В продакшн-системах используйте более сложную предобработку
      tokens = text.lower().split()
      # Удаление пунктуации и коротких токенов
      cleaned_tokens = [token.strip('.,!?";:()[]{}') for token in tokens]
      return [token for token in cleaned_tokens if len(token) > 1]
  def _calculate_idf(self, term: str) -> float:
      """Вычисление IDF с BM25-сглаживанием"""
      if term not in self.document_frequencies:
        return 0.0
      # BM25 IDF с добавлением 0.5 для сглаживания
      N = len(self.documents)
      df = self.document_frequencies[term]
      # Избегаем отрицательных IDF
      numerator = N - df + 0.5
      denominator = df + 0.5
      if numerator <= 0:
        return 0.0
      idf = math.log(numerator / denominator)
      return idf
  def fit(self, documents: List[str]):
      """Построение индекса BM25"""
      print("Подготовка корпуса документов...")
      # Токенизация всех документов
      self.documents = [self._tokenize(doc) for doc in documents]
      self.document_lengths = [len(doc_tokens) for doc_tokens in self.documents]
      if not self.document_lengths:
        raise ValueError("Корпус документов пуст")
      self.average_document_length = sum(self.document_lengths) / len(self.document_lengths)
      # Построение словаря и подсчёт статистик
      print("Построение словаря и статистик...")
      term_id = 0
      for doc_idx, doc_tokens in enumerate(self.documents):
        term_counts = Counter(doc_tokens)
        doc_unique_terms = set(doc_tokens)
        # Обновление частот документов
        for term in doc_unique_terms:
          self.document_frequencies[term] += 1
      # Присвоение ID новым терминам
          if term not in self.vocabulary:
            self.vocabulary[term] = term_id
            term_id += 1
      # Сохранение частот терминов для каждого документа
      for term, count in term_counts.items():
        self.term_document_matrix[term][doc_idx] = count
      print(f"Словарь содержит {len(self.vocabulary)} уникальных терминов")
      print(f"Средняя длина документа: {self.average_document_length:.2f}")
      # Предвычисление IDF для всех терминов
      print("Вычисление IDF...")
      for term in self.vocabulary:
        self.idf_cache[term] = self._calculate_idf(term)
  def _calculate_bm25_score(self, query_terms: List[str], doc_idx: int) -> float:
      """Вычисление BM25-скора для документа"""
      if doc_idx >= len(self.documents):
        return 0.0
      doc_length = self.document_lengths[doc_idx]
      score = 0.0
      # Нормализация длины документа
      length_normalization = 1 - self.b + self.b * (doc_length / self.average_document_length)
      query_term_counts = Counter(query_terms)
      for term, query_tf in query_term_counts.items():
        if term not in self.vocabulary:
          continue
        # IDF-компонент
        idf = self.idf_cache.get(term, 0.0)
        # Частота термина в документе
        doc_tf = self.term_document_matrix[term].get(doc_idx, 0)
        # BM25-формула для термина
        numerator = doc_tf * (self.k1 + 1)
        denominator = doc_tf + self.k1 * length_normalization
        tf_component = numerator / denominator if denominator > 0 else 0
        # Учитываем частоту термина в запросе (обычно 1)

        term_score = idf * tf_component * query_tf
        score += term_score
      return score
  def search(self, query: str, top_k: int = 5) -> List[Tuple[int, float]]:
      """Поиск с BM25-скорингом"""
      if not self.documents:
        raise ValueError("Индекс не построен. Вызовите fit() перед поиском.")
      query_terms = self._tokenize(query)
      if not query_terms:
        return []
      print(f"Поиск по запросу: {query_terms}")
      # Вычисление скоров для всех документов
      scores = []
      for doc_idx in range(len(self.documents)):
        score = self._calculate_bm25_score(query_terms, doc_idx)
        if score > 0: # Только положительные скоры
          scores.append((doc_idx, score))
      # Сортировка по убыванию скора
      scores.sort(key=lambda x: x[1], reverse=True)
      return scores[:top_k]
  def explain_scoring(self, query: str, doc_idx: int) -> Dict:
      """Подробное объяснение BM25-скоринга"""
      query_terms = self._tokenize(query)
      if doc_idx >= len(self.documents):
        return {'error': 'Недопустимый индекс документа'}
      doc_tokens = self.documents[doc_idx]
      doc_length = self.document_lengths[doc_idx]
      length_norm = 1 - self.b + self.b * (doc_length / self.average_document_length)
      explanation = {
      'query_terms': query_terms,
      'document_length': doc_length,
      'average_document_length': self.average_document_length,
      'length_normalization': length_norm,
      'parameters': {'k1': self.k1, 'b': self.b},
      'term_contributions': {},
      'total_score': 0.0
      }
      query_term_counts = Counter(query_terms)
      total_score = 0.0
      for term, query_tf in query_term_counts.items():
        if term in self.vocabulary:
          idf = self.idf_cache[term]
          doc_tf = self.term_document_matrix[term].get(doc_idx, 0)
          df = self.document_frequencies[term]
          # Компоненты BM25
          tf_numerator = doc_tf * (self.k1 + 1)
          tf_denominator = doc_tf + self.k1 * length_norm
          tf_component = tf_numerator / tf_denominator if tf_denominator > 0 else 0
          term_score = idf * tf_component * query_tf
          total_score += term_score
          explanation['term_contributions'][term] = {
          'document_frequency': df,
          'document_tf': doc_tf,
          'query_tf': query_tf,
          'idf': idf,
          'tf_component': tf_component,
          'term_score': term_score
          }
      explanation['total_score'] = total_score
      return explanation
  def get_statistics(self) -> Dict:
      """Статистика индекса"""
      return {
      'num_documents': len(self.documents),
      'vocabulary_size': len(self.vocabulary),
      'average_document_length': self.average_document_length,
      'total_terms': sum(self.document_lengths),
      'parameters': {'k1': self.k1, 'b': self.b}
      }



Оптимизированная реализация для больших корпусов

In [25]:
import numpy as np
from scipy import sparse
from sklearn.base import BaseEstimator, TransformerMixin
class OptimizedBM25:
  """Оптимизированная версия BM25 для больших корпусов"""
  def __init__(self, k1=1.2, b=0.75):
      self.k1 = k1
      self.b = b
      self.vocabulary_ = {}
      self.idf_ = None
      self.doc_len = None
      self.avgdl = 0.0
      self.doc_freqs = None
      self.tf_matrix = None
  def fit(self, corpus):
      """Быстрое построение индекса с scipy.sparse"""
      # Токенизация и построение словаря
      tokenized_corpus = [doc.lower().split() for doc in corpus]
      vocabulary = set()
      for doc in tokenized_corpus:
        vocabulary.update(doc)
      self.vocabulary_ = {term: i for i, term in enumerate(vocabulary)}
      # Построение разреженной TF-матрицы
      self._build_tf_matrix(tokenized_corpus)
      # Вычисление статистик
      self.doc_len = np.array([len(doc) for doc in tokenized_corpus])
      self.avgdl = self.doc_len.mean()
      # Вычисление IDF
      self._calculate_idf()
      return self
  def _build_tf_matrix(self, tokenized_corpus):
      """Построение разреженной матрицы частот терминов"""
      rows, cols, data = [], [], []
      for doc_idx, doc in enumerate(tokenized_corpus):
        term_counts = Counter(doc)
        for term, count in term_counts.items():
          if term in self.vocabulary_:
            rows.append(doc_idx)
            cols.append(self.vocabulary_[term])
            data.append(count)
            self.tf_matrix = sparse.csr_matrix(
            (data, (rows, cols)),
            shape=(len(tokenized_corpus), len(self.vocabulary_))
            )
  def _calculate_idf(self):
      """Векторизованное вычисление IDF"""
      N = self.tf_matrix.shape[0]
      # Количество документов, содержащих каждый термин
      df = np.array((self.tf_matrix > 0).sum(axis=0)).flatten()
      # BM25 IDF формула
      self.idf_ = np.log((N - df + 0.5) / (df + 0.5))
      # Обнуление отрицательных значений
      self.idf_ = np.maximum(self.idf_, 0)
  def transform(self, query):
      """Быстрый поиск с векторизованными операциями"""
      if isinstance(query, str):
        query_terms = query.lower().split()
      else:
        query_terms = query
      # Построение вектора запроса
      query_vec = np.zeros(len(self.vocabulary_))
      for term in query_terms:
        if term in self.vocabulary_:
          query_vec[self.vocabulary_[term]] += 1
      # Векторизованное вычисление BM25
      tf_matrix_dense = self.tf_matrix.toarray()
      # Нормализация длины документов
      doc_lens = self.doc_len.reshape(-1, 1)
      length_norm = 1 - self.b + self.b * (doc_lens / self.avgdl)
      # BM25-формула в векторизованном виде
      numerator = tf_matrix_dense * (self.k1 + 1)
      denominator = tf_matrix_dense + self.k1 * length_norm
      tf_component = numerator / denominator
      # Применение IDF и запроса
      scores = np.sum(tf_component * self.idf_ * query_vec, axis=1)
      return scores
  # Пример использования оптимизированной версии
def demo_optimized_bm25():
      """Демонстрация оптимизированного BM25"""
      documents = [
      "Python – мощный язык программирования для анализа данных",
      "Машинное обучение использует алгоритмы для анализа больших данных",
      "RAG-системы объединяют поиск информации с генерацией ответов",
      "Векторные базы данных оптимизированы для семантического поиска",
      "BM25-алгоритм превосходит TF-IDF в задачах информационного поиска"
      ]
      # Инициализация и обучение
      # bm25 = OptimizedBM25(k1=1.5, b=0.75)
      bm25 = OptimizedBM25(k1=1.5, b=0.75)
      bm25.fit(documents)
      # Поиск
      query = "анализ данных Python"
      scores = bm25.transform(query)
      # Вывод результатов
      ranked_docs = sorted(enumerate(scores), key=lambda x: x[1], reverse=True)
      print(f"Результаты поиска для запроса: '{query}'")
      print("-" * 50)
      for doc_idx, score in ranked_docs:
        if score > 0:
            print(f"Документ {doc_idx} (скорость: {score:.4f}):")

            print(f" {documents[doc_idx]}")
            print()
      return bm25, documents
if __name__ == "__main__":
    demo_optimized_bm25()


Результаты поиска для запроса: 'анализ данных Python'
--------------------------------------------------
Документ 0 (скорость: 1.0599):
 Python – мощный язык программирования для анализа данных

