In [None]:
import re
import math
import xml.etree.ElementTree as ET
from collections import defaultdict, Counter

# Dictionary parsing

In [None]:
dict_path = r'/content/dict.opcorpora.xml'

In [None]:
def parse_dict(xml_file_path):
    tree = ET.parse(xml_file_path)
    root = tree.getroot()
    return root

In [None]:
dict_root = parse_dict(dict_path)

# Parsing lemmata from dictionary

In [None]:
def parse_lemmata(root):
    lemmata_dict = defaultdict(list)
    lemmata_section = root.find('lemmata')

    for lemma_elem in lemmata_section.findall('lemma'):
        l_elem = lemma_elem.find('l')
        lemma_text = l_elem.get('t', '')

        first_g_elem = l_elem.find('g')
        main_grammem = first_g_elem.get('v', '') if first_g_elem is not None else ''

        for f_elem in lemma_elem.findall('f'):
            form_text = f_elem.get('t', '')
            if form_text:
                new_entry = (lemma_text, main_grammem)

                if new_entry not in lemmata_dict[form_text]:
                    lemmata_dict[form_text].append(new_entry)

    return dict(lemmata_dict)

In [None]:
lemmata_dict = parse_lemmata(dict_root)

# Parse universal dictionatru and get statistic

In [None]:
def parse_conllu(file_path):
    sentences = []
    current = []
    with open(file_path, encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                if current:
                    sentences.append(current)
                    current = []
                continue
            if line.startswith("#"):  # skip comment lines
                continue
            parts = line.split('\t')
            if len(parts) != 10:
                continue
            idx, form, lemma, upos, xpos, feats, head, deprel, deps, misc = parts
            feats_dict = {}
            if feats != "_":
                for feat in feats.split("|"):
                    k, v = feat.split("=")
                    feats_dict[k] = v
            current.append({
                "form": form,
                "lemma": lemma,
                "upos": upos,
                "feats": feats_dict
            })
    return sentences


In [None]:
def build_bayes_stats(sentences, window=1):
    emission_counts = defaultdict(Counter)  # word -> POS -> count
    emission_totals = Counter()
    context_counts = defaultdict(Counter)  # context_POS -> POS -> count
    context_totals = Counter()

    for sent in sentences:
        upos_seq = [normalize_tag(tok["upos"]) for tok in sent]
        forms = [tok["form"].lower() for tok in sent]

        for i, (w, g_raw) in enumerate(zip(forms, upos_seq)):
            g = g_raw
            emission_counts[w][g] += 1
            emission_totals[w] += 1

            for j in range(max(0, i - window), min(len(sent), i + window + 1)):
                if j == i:
                    continue
                ctx = upos_seq[j]
                context_counts[ctx][g] += 1
                context_totals[ctx] += 1

    return emission_counts, emission_totals, context_counts, context_totals


In [None]:
sentences = parse_conllu("/content/ru_gsd-ud-train.conllu")

In [None]:
# add synthetic sentences to balance
synthetic = [
    {"form": "Я", "lemma": "я", "upos": "PRON"},
    {"form": "люблю", "lemma": "любить", "upos": "VERB"},
    {"form": "печь", "lemma": "печь", "upos": "VERB"},
    {"form": "пироги", "lemma": "пирог", "upos": "NOUN"}
]
sentences.append(synthetic)


In [None]:
emission_counts, emission_totals, context_counts, context_totals = build_bayes_stats(sentences)
all_tags = list({g for s in sentences for g in [tok["upos"] for tok in s]})


In [None]:
len(emission_counts)

24526

In [None]:
TAG_MAP = {
    "CCONJ": "CONJ", "CONJ": "CONJ",
    "ADJF": "ADJ", "ADJ": "ADJ",
    "ADVB": "ADV", "ADV": "ADV",
    "INFN": "VERB", "VERB": "VERB",
    "NOUN": "NOUN", "PROPN": "NOUN",
    "PRCL": "PART", "PART": "PART",
    "NPRO": "PRON", "PRON": "PRON",
    "NUMR": "NUM", "NUM": "NUM",
    "ADP": "ADP", "PREP": "ADP",
    "INTJ": "INTJ",
    "PUNCT": "PUNCT",
    "SYM": "SYM",
}

def normalize_tag(tag):
    return TAG_MAP.get(tag, tag)

In [None]:
# Compute global prior P(g)
def compute_global_prior(emission_counts, emission_totals, all_tags):
    gcount = Counter()
    for w, c in emission_counts.items():
        for tag, cnt in c.items():
            gcount[normalize_tag(tag)] += cnt
    total = sum(gcount.values())
    if total == 0:
        return {t: 1.0/len(all_tags) for t in all_tags}
    return {t: gcount.get(t, 0)/total for t in all_tags}

EPS = 1e-12

# P(g|word)
def P_g_given_w(word, g, emission_counts, emission_totals, all_tags, smooth=1.0):
    word = word.lower()
    g = normalize_tag(g)
    if word not in emission_counts:
        return 0.0
    denom = emission_totals.get(word, 0) + smooth * len(all_tags)
    return (emission_counts[word].get(g, 0) + smooth) / denom

# P(g|context_tag)
def P_g_given_context(g, context_g, context_counts, context_totals, all_tags, smooth=1.0):
    g = normalize_tag(g)
    ctx = normalize_tag(context_g)
    if ctx not in context_counts:
        return 1.0 / len(all_tags)
    denom = context_totals.get(ctx, 0) + smooth * len(all_tags)
    return (context_counts[ctx].get(g, 0) + smooth) / denom


In [None]:
def disambiguate_bayes(word, candidates, left_ctx, right_ctx,
                       emission_counts, emission_totals,
                       context_counts, context_totals, all_tags,
                       alpha=0.6, beta_left=1.2, beta_right=2.8, smoothing=1e-3,
                       global_prior=None, debug=False):
    word_l = word.lower()
    if global_prior is None:
        gp = compute_global_prior(emission_counts, emission_totals, all_tags)
    else:
        gp = global_prior

    best = None
    best_score = -1e18
    scores = []

    for lemma, pos in candidates:
        pos_norm = normalize_tag(pos)

        # P(g|w)
        p_em = P_g_given_w(word_l, pos_norm, emission_counts, emission_totals, all_tags, smooth=smoothing)

        # backoff
        p_comb = alpha * p_em + (1.0 - alpha) * gp.get(pos_norm, 1.0/len(all_tags))
        # safety
        score = math.log(p_comb + EPS)

        if left_ctx:
            p_left = P_g_given_context(pos_norm, left_ctx, context_counts, context_totals, all_tags, smooth=smoothing)
            score += beta_left * math.log(p_left + EPS)

        if right_ctx:
            p_right = P_g_given_context(pos_norm, right_ctx, context_counts, context_totals, all_tags, smooth=smoothing)
            score += beta_right * math.log(p_right + EPS)

        scores.append(((lemma, pos_norm), score))
        if score > best_score:
            best_score = score
            best = (lemma, pos_norm)

    if debug:
        print("Bayes scores for", word, "left:", left_ctx, "right:", right_ctx)
        for (lem, p), sc in scores:
            print(f"  {lem}/{p}: {sc:.4f}")
        print("=> chosen:", best, "score", best_score)

    return best


In [None]:
def guess_unknown_word(word, is_sentence_start=False, prev_pos=None):
    word_lower = word.lower()
    if word[0].isupper() and not is_sentence_start:
        return word_lower, 'NOUN'
    if word_lower.endswith(('ость', 'ство', 'ация', 'изм')):
        return word_lower, normalize_tag('NOUN')
    elif word_lower.endswith(('ый', 'ий', 'ой', 'ая', 'яя', 'ое')):
        return word_lower, normalize_tag('ADJF')
    elif word_lower.endswith(('ть', 'ти', 'чь')):
        return word_lower, normalize_tag('INFN')
    elif word_lower.endswith(('о', 'е', 'и')):
        return word_lower, normalize_tag('ADVB')
    elif any(char.isdigit() for char in word):
        return word_lower, normalize_tag('NUMR')
    return word_lower, 'NOUN'


# Tokenize and lemmatize text

In [None]:
def tokenize(text):
    tokens = re.findall(r'\w+', text)
    return [token for token in tokens if token]

def normalize(word):
    return word.lower()

In [None]:
def process_text_with_lemmatization(input_text, lemmata_dict,
                                    emission_counts, emission_totals,
                                    context_counts, context_totals,
                                    all_tags,
                                    alpha=0.6, beta_left=1.2, beta_right=2.8, smoothing=1e-3,
                                    debug=False):
    lines = input_text.strip().split('\n')
    results = []

    for line in lines:
        if not line.strip():
          continue

        tokens = tokenize(line)
        normalized_tokens = [normalize(token) for token in tokens]

        word_infos = []
        for i, (token, normalized) in enumerate(zip(tokens, normalized_tokens)):
            is_sentence_start = (i == 0)
            prev_pos = word_infos[i-1]['pos'] if i > 0 else None

            if normalized in lemmata_dict:
                possible_lemmas_raw = lemmata_dict[normalized]
                possible_lemmas = [(lem, normalize_tag(pos)) for (lem,pos) in possible_lemmas_raw]
                has_ambiguity = len(possible_lemmas) > 1
                lemma, pos = possible_lemmas[0]
                word_infos.append({
                    'token': token,
                    'normalized': normalized,
                    'lemma': lemma,
                    'pos': pos,
                    'has_ambiguity': has_ambiguity,
                    'possible_lemmas': possible_lemmas
                })
            else:
                lemma, pos = guess_unknown_word(token, is_sentence_start, prev_pos)
                pos = normalize_tag(pos)
                word_infos.append({
                    'token': token,
                    'normalized': normalized,
                    'lemma': lemma,
                    'pos': pos,
                    'has_ambiguity': False,
                    'possible_lemmas': [(lemma, pos)]
                })

        if not word_infos:  # empty line
          continue
        # right context
        for i in range(len(word_infos)-1):
            word_infos[i]['right_pos'] = word_infos[i+1]['pos']
        word_infos[-1]['right_pos'] = None

        for i, info in enumerate(word_infos):
            if not info['has_ambiguity']:
                continue
            left_ctx = word_infos[i-1]['pos'] if i > 0 else None
            right_ctx = info.get('right_pos', None)

            lemma, pos = disambiguate_bayes(
                info['normalized'],
                info['possible_lemmas'],
                left_ctx,
                right_ctx,
                emission_counts,
                emission_totals,
                context_counts,
                context_totals,
                all_tags,
                alpha=alpha, beta_left=beta_left, beta_right=beta_right, smoothing=smoothing,
                global_prior=None, debug=debug
            )
            info['lemma'] = lemma
            info['pos'] = pos

        processed_tokens = [
            f"{info['token']}{{{info['lemma']}={info['pos']}}}"
            for info in word_infos
        ]
        results.append(' '.join(processed_tokens))

    return '\n'.join(results)


In [None]:
print(process_text_with_lemmatization("Русская печь. я люблю печь пироги. Ваза из стекла. Вода стекла по оконной раме",
    lemmata_dict, emission_counts, emission_totals, context_counts, context_totals, all_tags,
    alpha=0.7, beta_left=1.5, beta_right=2.0, smoothing=1e-4, debug=True))


Bayes scores for печь left: ADJ right: PRON
  печь/VERB: -7.7508
  печь/NOUN: -6.3176
=> chosen: ('печь', 'NOUN') score -6.317599555666266
Bayes scores for печь left: VERB right: NOUN
  печь/VERB: -10.4340
  печь/NOUN: -5.5396
=> chosen: ('печь', 'NOUN') score -5.539568543123753
Bayes scores for пироги left: NOUN right: NOUN
  пирог/NOUN: -5.4718
  пирога/NOUN: -5.4718
=> chosen: ('пирог', 'NOUN') score -5.471807897897936
Bayes scores for ваза left: NOUN right: ADP
  ваза/NOUN: -5.9691
  ваз/NOUN: -5.9691
=> chosen: ('ваза', 'NOUN') score -5.969063522741976
Bayes scores for из left: NOUN right: NOUN
  из/ADP: -6.9845
  иза/NOUN: -7.5308
=> chosen: ('из', 'ADP') score -6.984512065803482
Bayes scores for стекла left: ADP right: NOUN
  стекло/NOUN: -4.3005
  стёк/VERB: -11.7619
=> chosen: ('стекло', 'NOUN') score -4.300531267460037
Bayes scores for вода left: NOUN right: NOUN
  вод/NOUN: -5.4708
  вода/NOUN: -5.4708
=> chosen: ('вод', 'NOUN') score -5.470827404495894
Bayes scores for стек

#Accuracy

In [None]:
with open("/content/сrime_and_punishment.txt", encoding="utf-8") as f:
    big_text = f.read()

paragraphs = [p.strip() for p in big_text.split('\n') if p.strip()]
print(f"Количество абзацев: {len(paragraphs)}")


Количество абзацев: 4879


In [None]:
! pip install pymorphy3

Collecting pymorphy3
  Downloading pymorphy3-2.0.6-py3-none-any.whl.metadata (2.4 kB)
Collecting dawg2-python>=0.8.0 (from pymorphy3)
  Downloading dawg2_python-0.9.0-py3-none-any.whl.metadata (7.5 kB)
Collecting pymorphy3-dicts-ru (from pymorphy3)
  Downloading pymorphy3_dicts_ru-2.4.417150.4580142-py2.py3-none-any.whl.metadata (2.0 kB)
Downloading pymorphy3-2.0.6-py3-none-any.whl (53 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.9/53.9 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dawg2_python-0.9.0-py3-none-any.whl (9.3 kB)
Downloading pymorphy3_dicts_ru-2.4.417150.4580142-py2.py3-none-any.whl (8.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.4/8.4 MB[0m [31m82.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pymorphy3-dicts-ru, dawg2-python, pymorphy3
Successfully installed dawg2-python-0.9.0 pymorphy3-2.0.6 pymorphy3-dicts-ru-2.4.417150.4580142


In [None]:
import pymorphy3
morph = pymorphy3.MorphAnalyzer()

def analyze_with_pymorphy(text):
    tokens = tokenize(text)
    res = []
    for t in tokens:
        parse = morph.parse(t)[0]
        res.append((t, parse.normal_form, parse.tag.POS))
    return res


In [None]:
P2U_TAGS = {
    "ADJF": "ADJ",
    "ADJS": "ADJ",
    "COMP": "ADJ",
    "VERB": "VERB",
    "INFN": "VERB",
    "PRTF": "VERB",
    "PRTS": "VERB",
    "GRND": "VERB",
    "NOUN": "NOUN",
    "NPRO": "PRON",
    "NUMR": "NUM",
    "ADVB": "ADV",
    "PREP": "ADP",
    "CONJ": "CONJ",
    "PRCL": "PART",
    "INTJ": "INTJ",
    "PRED": "ADV",
}

def normalize_pymorphy_tag(tag):
    return P2U_TAGS.get(tag, tag)


In [None]:
def evaluate_accuracy(our_tags, pymorphy_tags):
    mapping = {
        "ADJF": "ADJF", "ADJS": "ADJF", "COMP": "ADJF",
        "INFN": "VERB", "VERB": "VERB", "PRTF": "VERB", "PRTS": "VERB", "GRND": "VERB",
        "NOUN": "NOUN", "PROPN": "NOUN", "NPRO": "PRON",
        "ADVB": "ADVB", "PREP": "ADP", "CONJ": "CONJ", "PRCL": "PART",
        "INTJ": "INTJ", "NUMR": "NUM", "PRED": "ADJF"
    }

    def normalize_tag(tag):
        return mapping.get(tag, tag)

    total = min(len(our_tags), len(pymorphy_tags))
    correct = 0

    for ours, pym in zip(our_tags[:total], pymorphy_tags[:total]):
        w1, t1 = ours[0], ours[1]
        w2, t2 = pym[0], pym[1]

        if w1.lower() != w2.lower():
            continue
        if t1 == normalize_tag(t2):
            correct += 1

    return correct / total if total else 0.0


In [None]:
sample_text = " ".join(paragraphs)
our = process_text_with_lemmatization(sample_text, lemmata_dict,
                                      emission_counts, emission_totals,
                                      context_counts, context_totals,
                                      all_tags)


our_tags = re.findall(r'(\S+){(\S+)=(\S+)}', our)
pymorphy_tags = analyze_with_pymorphy(sample_text)

acc = evaluate_accuracy(our_tags, pymorphy_tags)
print(f"Совпадение по частям речи: {acc:.3f}")


Совпадение по частям речи: 0.739
