In [1]:
import nltk
import random
from collections import Counter

# Завантажуємо необхідні ресурси NLTK
nltk.download('treebank')
nltk.download('universal_tagset')

# 1. Завантаження корпусу Treebank з універсальним набором тегів
# Використання 'universal_tagset' спрощує кількість тегів (напр., 'NNP', 'NNS' стають 'NOUN')
tagged_sents = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))

print(f"Загальна кількість речень у корпусі: {len(tagged_sents)}")
print("Приклад речення:", tagged_sents[0])

# 2. Перемішування та розділення на навчальну і тестову вибірки
random.seed(42) # для відтворюваності результатів
random.shuffle(tagged_sents)

split_point = int(len(tagged_sents) * 0.8)
train_sents = tagged_sents[:split_point]
test_sents = tagged_sents[split_point:]

print(f"Кількість речень у навчальній вибірці: {len(train_sents)}")
print(f"Кількість речень у тестовій вибірці: {len(test_sents)}")

# 3. Функція для запису даних у файли
def write_tagged_sentences(sentences, filename):
    with open(filename, 'w', encoding='utf-8') as f:
        for sentence in sentences:
            # Ігноруємо порожні речення, які можуть виникнути
            if not sentence:
                continue
            for word, tag in sentence:
                f.write(f"{word}\t{tag}\n")
            f.write("\n") # Порожній рядок як роздільник між реченнями

# Записуємо навчальну та тестову вибірки
write_tagged_sentences(train_sents, "treebank_training.pos")
write_tagged_sentences(test_sents, "treebank_test.pos")

# 4. Створення та збереження словника
word_counter = Counter(word for sentence in train_sents for word, tag in sentence)

# Зберігаємо слова, які з'являються принаймні двічі
vocab = {word for word, count in word_counter.items() if count >= 2}

with open("treebank_vocab.txt", 'w', encoding='utf-8') as f:
    for word in sorted(list(vocab)):
        f.write(f"{word}\n")

print(f"Розмір словника (слова, що зустрічаються >= 2 рази): {len(vocab)}")

# 5. Створення файлу з тестовими словами для подальшого використання
with open("treebank_test_words.txt", 'w', encoding='utf-8') as f:
    for sentence in test_sents:
        if not sentence:
            continue
        for word, _ in sentence:
            f.write(f"{word}\n")
        f.write("\n")

print("Підготовка даних завершена. Файли збережено.")

[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Unzipping corpora/treebank.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


Загальна кількість речень у корпусі: 3914
Приклад речення: [('Pierre', 'NOUN'), ('Vinken', 'NOUN'), (',', '.'), ('61', 'NUM'), ('years', 'NOUN'), ('old', 'ADJ'), (',', '.'), ('will', 'VERB'), ('join', 'VERB'), ('the', 'DET'), ('board', 'NOUN'), ('as', 'ADP'), ('a', 'DET'), ('nonexecutive', 'ADJ'), ('director', 'NOUN'), ('Nov.', 'NOUN'), ('29', 'NUM'), ('.', '.')]
Кількість речень у навчальній вибірці: 3131
Кількість речень у тестовій вибірці: 783
Розмір словника (слова, що зустрічаються >= 2 рази): 5119
Підготовка даних завершена. Файли збережено.


In [2]:
from collections import defaultdict

def create_dictionaries(training_corpus_path, vocab):
    """
    Створює словники для частот емісій, переходів та тегів.

    Args:
        training_corpus_path (str): Шлях до файлу з навчальним корпусом.
        vocab (set): Словник відомих слів.

    Returns:
        tuple: (emission_counts, transition_counts, tag_counts)
    """
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    # Попередній тег ініціалізуємо як тег початку речення
    prev_tag = '--s--'

    with open(training_corpus_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()

            # Якщо рядок порожній, це кінець речення
            if not line:
                # Перехід від останнього тега речення до стану "початок наступного"
                transition_counts[(prev_tag, '--s--')] += 1
                prev_tag = '--s--'
                continue

            word, tag = line.split('\t')

            # Обробка невідомих слів
            if word not in vocab:
                # В майбутньому ми замінимо це на більш складну логіку,
                # а поки що використовуємо єдиний токен для невідомих слів.
                word = '--unk--'

            # Рахуємо переходи: (попередній_тег, поточний_тег)
            transition_counts[(prev_tag, tag)] += 1

            # Рахуємо емісії: (тег, слово)
            emission_counts[(tag, word)] += 1

            # Рахуємо загальну кількість кожного тега
            tag_counts[tag] += 1

            # Оновлюємо попередній тег
            prev_tag = tag

    return emission_counts, transition_counts, tag_counts

# Створюємо словники на основі навчальних даних
emission_counts, transition_counts, tag_counts = create_dictionaries("treebank_training.pos", vocab)

# Додаємо тег початку речення до лічильника тегів
tag_counts['--s--'] = len(train_sents)

print("Приклад частоти емісії:", list(emission_counts.items())[0])
print("Приклад частоти переходу:", list(transition_counts.items())[0])
print("Приклад частоти тега:", list(tag_counts.items())[0])

Приклад частоти емісії: (('DET', 'The'), 584)
Приклад частоти переходу: (('--s--', 'DET'), 736)
Приклад частоти тега: ('DET', 7026)


In [3]:
import numpy as np

def create_transition_matrix(alpha, tag_counts, transition_counts):
    """
    Створює матрицю переходів A з використанням згладжування.

    Args:
        alpha (float): Параметр згладжування.
        tag_counts (dict): Словник з частотами тегів.
        transition_counts (dict): Словник з частотами переходів.

    Returns:
        (np.array, dict, dict): Матриця A, словники для відображення тегів в індекси і навпаки.
    """
    all_tags = sorted(tag_counts.keys())
    tag_to_idx = {tag: i for i, tag in enumerate(all_tags)}
    idx_to_tag = {i: tag for i, tag in enumerate(all_tags)}

    num_tags = len(all_tags)
    A = np.zeros((num_tags, num_tags))

    for i in range(num_tags):
        prev_tag = idx_to_tag[i]

        # Загальна кількість разів, коли зустрічався попередній тег
        count_prev_tag = tag_counts[prev_tag]

        for j in range(num_tags):
            tag = idx_to_tag[j]

            # Кількість переходів від prev_tag до tag
            count_transition = transition_counts.get((prev_tag, tag), 0)

            # Застосування згладжування
            A[i, j] = (count_transition + alpha) / (count_prev_tag + alpha * num_tags)

    return A, tag_to_idx, idx_to_tag

# Створюємо матрицю переходів
# alpha - це гіперпараметр, зазвичай обирають маленьке значення
alpha = 0.001
A, tag_to_idx, idx_to_tag = create_transition_matrix(alpha, tag_counts, transition_counts)

print("Розмір матриці переходів A:", A.shape)
# Ймовірність переходу від 'NOUN' до 'VERB'
print(f"P('VERB' | 'NOUN') = {A[tag_to_idx['NOUN'], tag_to_idx['VERB']]:.6f}")
# Ймовірність переходу від початку речення до іменника
print(f"P('NOUN' | '--s--') = {A[tag_to_idx['--s--'], tag_to_idx['NOUN']]:.6f}")

Розмір матриці переходів A: (13, 13)
P('VERB' | 'NOUN') = 0.145418
P('NOUN' | '--s--') = 0.290322


In [4]:
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    """
    Створює матрицю емісій B з використанням згладжування.

    Args:
        alpha (float): Параметр згладжування.
        tag_counts (dict): Словник з частотами тегів.
        emission_counts (dict): Словник з частотами емісій.
        vocab (list): Список унікальних слів.

    Returns:
        (np.array, dict, dict): Матриця B, словники для відображення слів в індекси і навпаки.
    """
    all_tags = sorted(tag_counts.keys())
    num_tags = len(all_tags)

    # Додаємо токен для невідомих слів до словника
    all_words = sorted(list(vocab)) + ['--unk--']
    word_to_idx = {word: i for i, word in enumerate(all_words)}
    idx_to_word = {i: word for i, word in enumerate(all_words)}

    num_words = len(all_words)

    B = np.zeros((num_tags, num_words))

    for i in range(num_tags):
        tag = all_tags[i]
        count_tag = tag_counts[tag]

        for j in range(num_words):
            word = idx_to_word[j]
            count_emission = emission_counts.get((tag, word), 0)

            # Згладжування
            B[i, j] = (count_emission + alpha) / (count_tag + alpha * num_words)

    return B, word_to_idx, idx_to_word

# Створюємо словник слів
vocab_list = sorted(list(vocab))
B, word_to_idx, idx_to_word = create_emission_matrix(alpha, tag_counts, emission_counts, vocab_list)


print("Розмір матриці емісій B:", B.shape)
# Ймовірність, що тег 'NOUN' згенерує слово 'time'
if 'time' in word_to_idx:
    print(f"P('time' | 'NOUN') = {B[tag_to_idx['NOUN'], word_to_idx['time']]:.6f}")
else:
    print("'time' не є достатньо частим словом у навчальній вибірці.")

# Ймовірність, що тег 'VERB' згенерує слово 'is'
if 'is' in word_to_idx:
    print(f"P('is' | 'VERB') = {B[tag_to_idx['VERB'], word_to_idx['is']]:.6f}")
else:
    print("'is' не є достатньо частим словом у навчальній вибірці.")

Розмір матриці емісій B: (13, 5120)
P('time' | 'NOUN') = 0.001982
P('is' | 'VERB') = 0.050307


In [5]:
import math

def viterbi(sentence, tag_to_idx, word_to_idx, A, B):
    """
    Знаходить найкращу послідовність тегів для речення за допомогою алгоритму Вітербі.

    Args:
        sentence (list): Список слів у реченні.
        tag_to_idx (dict): Словник для відображення тегів в індекси.
        word_to_idx (dict): Словник для відображення слів в індекси.
        A (np.array): Матриця переходів.
        B (np.array): Матриця емісій.

    Returns:
        list: Найімовірніша послідовність тегів.
    """
    num_tags = len(tag_to_idx)
    T = len(sentence) # Довжина речення

    # Ініціалізуємо матриці
    viterbi_matrix = np.full((num_tags, T), -np.inf) # Матриця для зберігання ймовірностей
    backpointer = np.zeros((num_tags, T), dtype=int) # Матриця для зберігання шляху

    # 1. Ініціалізація (для першого слова)
    start_tag_idx = tag_to_idx['--s--']
    first_word_idx = word_to_idx.get(sentence[0], word_to_idx['--unk--'])

    for tag_idx in range(num_tags):
        # Перевіряємо, чи є перехід/емісія можливими (ймовірність > 0)
        if A[start_tag_idx, tag_idx] > 0 and B[tag_idx, first_word_idx] > 0:
            viterbi_matrix[tag_idx, 0] = math.log(A[start_tag_idx, tag_idx]) + math.log(B[tag_idx, first_word_idx])

    # 2. Прямий хід (для решти слів)
    for t in range(1, T):
        word_idx = word_to_idx.get(sentence[t], word_to_idx['--unk--'])
        for j in range(num_tags): # Поточний тег
            max_prob = -np.inf
            best_prev_tag_idx = -1

            for i in range(num_tags): # Попередній тег
                # Ймовірність переходу від попереднього тега до поточного
                if A[i, j] > 0 and B[j, word_idx] > 0:
                    prob = viterbi_matrix[i, t-1] + math.log(A[i, j]) + math.log(B[j, word_idx])
                    if prob > max_prob:
                        max_prob = prob
                        best_prev_tag_idx = i

            viterbi_matrix[j, t] = max_prob
            backpointer[j, t] = best_prev_tag_idx

    # 3. Зворотний хід
    best_path = []

    # Знаходимо останній тег
    last_tag_idx = np.argmax(viterbi_matrix[:, T-1])
    best_path.append(idx_to_tag[last_tag_idx])

    # Йдемо назад по вказівниках
    for t in range(T-1, 0, -1):
        last_tag_idx = backpointer[last_tag_idx, t]
        best_path.insert(0, idx_to_tag[last_tag_idx])

    return best_path

# Приклад використання
test_sentence = ["This", "is", "a", "test", "."]
predicted_tags = viterbi(test_sentence, tag_to_idx, word_to_idx, A, B)

print(f"Речення: {test_sentence}")
print(f"Передбачені теги: {predicted_tags}")

Речення: ['This', 'is', 'a', 'test', '.']
Передбачені теги: ['DET', 'VERB', 'DET', 'NOUN', '.']


In [6]:
from tqdm import tqdm # Для відображення прогресу

def compute_accuracy(test_sents, tag_to_idx, word_to_idx, A, B):
    """
    Обчислює точність моделі на тестових даних.
    """
    num_correct = 0
    total = 0

    for sentence_tags in tqdm(test_sents, desc="Оцінка точності"):
        if not sentence_tags:
            continue

        sentence = [word for word, tag in sentence_tags]
        gold_tags = [tag for word, tag in sentence_tags]

        # Передбачаємо теги для речення
        predicted_tags = viterbi(sentence, tag_to_idx, word_to_idx, A, B)

        for pred_tag, gold_tag in zip(predicted_tags, gold_tags):
            if pred_tag == gold_tag:
                num_correct += 1
            total += 1

    return num_correct / total

# Обчислюємо точність
accuracy = compute_accuracy(test_sents, tag_to_idx, word_to_idx, A, B)
print(f"\nТочність реалізованої моделі на тестовій вибірці: {accuracy:.4f}")

Оцінка точності: 100%|██████████| 783/783 [00:04<00:00, 158.31it/s]


Точність реалізованої моделі на тестовій вибірці: 0.9297





In [7]:
from nltk.tag import DefaultTagger, UnigramTagger, BigramTagger

# 1. Визначаємо найчастіший тег для DefaultTagger
most_common_tag = Counter(tag for sent in train_sents for word, tag in sent).most_common(1)[0][0]

# 2. Створюємо та навчаємо тегери NLTK
default_tagger = DefaultTagger(most_common_tag)
unigram_tagger = UnigramTagger(train_sents, backoff=default_tagger)
bigram_tagger = BigramTagger(train_sents, backoff=unigram_tagger)

# 3. Оцінюємо точність тегера NLTK на тій самій тестовій вибірці
nltk_accuracy = bigram_tagger.accuracy(test_sents)

print(f"Точність NLTK BigramTagger: {nltk_accuracy:.4f}")
print(f"Точність нашої реалізації: {accuracy:.4f}")
print(f"Різниця: {abs(accuracy - nltk_accuracy):.4f}")

# Висновок
if accuracy > nltk_accuracy:
    print("\nНаша реалізація виявилася точнішою за стандартний BigramTagger з NLTK!")
else:
    print("\nСтандартний BigramTagger з NLTK виявився точнішим.")

Точність NLTK BigramTagger: 0.9352
Точність нашої реалізації: 0.9297
Різниця: 0.0055

Стандартний BigramTagger з NLTK виявився точнішим.
