# Закон Ципфа

In [None]:
import sys
from pathlib import Path

sys.path.append(str(Path().resolve() / "src"))

import matplotlib.pyplot as plt

from collections import Counter
from typing import List
from Stemmer import Stemmer
from pymongo import MongoClient

from log import log

In [None]:
RU_STEMMER = Stemmer("russian")
EN_STEMMER = Stemmer("english")


def load_texts_from_mongodb(
    uri="mongodb://root:example@localhost:27017", limit=5000
) -> list[str]:
    client = MongoClient(uri)

    db = client["scraper"]

    collection = db["scraps"]

    texts = []
    total_docs = collection.count_documents({})

    log.info(f"Найдено документов: {total_docs}")

    batch_size = 10000
    processed = 0

    for doc in collection.find({}, {"value": 1}).batch_size(batch_size):
        if "value" in doc and doc["value"]:
            texts.append(doc["value"])

        processed += 1
        if processed % 1000 == 0:
            log.info(f"Обработано {processed}/{total_docs} документов")
        if processed >= limit:
            break

    client.close()
    log.info(f"Загружено {len(texts)} текстов")

    return texts


def is_cyrillic(char: str) -> bool:
    if len(char.encode("utf-8")) >= 2:
        bytes_val = char.encode("utf-8")
        if len(bytes_val) >= 2:
            return "\u0400" <= char <= "\u04ff"
    return False


def normalize_text_utf8(text: str) -> str:
    if not text:
        return ""

    result = []
    i = 0
    text_len = len(text)

    while i < text_len:
        char = text[i]
        if ord(char) < 128:
            if char.isalnum():
                result.append(char.lower())
            else:
                result.append(" ")
            i += 1
        else:
            if is_cyrillic(char):
                lower_char = char.lower()
                result.append(lower_char)
            else:
                result.append(" ")
            i += 1

    return "".join(result)


def tokenize_and_stem(text: str) -> List[str]:
    if not text:
        return []

    normalized = normalize_text_utf8(text)

    tokens = []
    current_token = []

    for char in normalized:
        if char != " ":
            current_token.append(char)
        else:
            if current_token:
                token = "".join(current_token)
                if token:
                    tokens.append(token)
                current_token = []

    if current_token:
        token = "".join(current_token)
        if token:
            tokens.append(token)

    stemmed_tokens = []
    for token in tokens:
        has_cyrillic = any(is_cyrillic(c) for c in token)

        if has_cyrillic:
            stemmed = RU_STEMMER.stemWord(token)
        else:
            stemmed = EN_STEMMER.stemWord(token)

        if len(stemmed) > 2:
            stemmed_tokens.append(stemmed)

    return stemmed_tokens


def build_zipf_law(tokens: List[str]):
    word_counts = Counter(tokens)

    sorted_counts = word_counts.most_common()

    ranks = list(range(1, len(sorted_counts) + 1))
    frequencies = [count for _, count in sorted_counts]

    plt.figure(figsize=(10, 8))

    plt.loglog(ranks, frequencies, "ro-", linewidth=2, alpha=0.7, markersize=4)
    plt.xlabel("Ранг слова (log)", fontsize=12)
    plt.ylabel("Частота (log)", fontsize=12)
    plt.title("Закон Ципфа (логарифмический масштаб)", fontsize=14)
    plt.grid(True, alpha=0.3, which="both")

    if len(ranks) > 1:
        zipf_freq = [frequencies[0] / r for r in ranks]
        plt.loglog(
            ranks,
            zipf_freq,
            "g--",
            linewidth=1,
            alpha=0.5,
            label="Теоретическая кривая Ципфа",
        )
        plt.legend()

    plt.suptitle(
        f"Закон Ципфа\nВсего уникальных слов: {len(sorted_counts)} | Всего токенов: {len(tokens)}",
        fontsize=16,
    )

    plt.tight_layout()
    plt.show()

    print(f"Всего токенов: {len(tokens)}")
    print(f"Уникальных токенов: {len(sorted_counts)}")
    print(f"Средняя длина токена: {sum(map(len, tokens)) / len(tokens)}")

    print("\nТоп-10 самых частых слов:")
    for i, (word, count) in enumerate(sorted_counts[:10], 1):
        percentage = (count / len(tokens)) * 100
        print(f"{i:2d}. {word:20s} - {count:6d} ({percentage:.2f}%)")

In [None]:
texts = load_texts_from_mongodb(limit=50000)

In [None]:
tokens = tokenize_and_stem(" ".join(texts))

In [None]:
build_zipf_law(tokens)