In [None]:
!pip install datasets
!pip install conllu

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting datasets
  Downloading datasets-2.8.0-py3-none-any.whl (452 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m452.9/452.9 KB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0.0,>=0.2.0
  Downloading huggingface_hub-0.12.0-py3-none-any.whl (190 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m190.3/190.3 KB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
Collecting multiprocess
  Downloading multiprocess-0.70.14-py38-none-any.whl (132 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m132.0/132.0 KB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
Collecting xxhash
  Downloading xxhash-3.2.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (213 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m213.0/213.0 KB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
Collecting responses<0.19
  Downloading r

In [None]:
from datasets import load_dataset
from collections import Counter, defaultdict
from spacy.lang.es import Spanish
import math
import numpy as np
import pandas as pd



# Data loading and preprocessing

In [None]:
def create_vocab(dataset):
    #dataset = load_dataset("PlanTL-GOB-ES/UD_Spanish-AnCora", split="train")
    pos_tags = dataset.features["upos_tags"].feature.names
    print(pos_tags)
    pos_tags.append("<BOS>")
    pos_tags.append("<EOS>")
    print(pos_tags)

    # Get the frequency of every word-tag pair in the dataset
    word_tag = []
    for sentence in dataset:
        word_tag.append(("<BOS>", "<BOS>"))
        for word, tag in (zip(sentence["tokens"], sentence["upos_tags"])):
            word_tag.append((word, tag))
        word_tag.append(("<EOS>", "<EOS>"))
    vocab_counts_all = Counter(sorted(word_tag))

    # Replace low-frequency terms by unknown token
    vocab_counts = {("<UNK>", pos_tags.index("X")): 0}
    for tup, count in vocab_counts_all.items():
        if count < 2:
            vocab_counts[("<UNK>", pos_tags.index("X"))] += count
        else:
            vocab_counts[tup] = count

    # Encode words of vocabulary
    vocab_words = sorted(set([word for word, tag in vocab_counts.keys()]))
    vocab_index = {word: i for i, word in enumerate(vocab_words)}

    return vocab_counts, vocab_index, pos_tags

In [None]:
def preprocess_dataset(dataset, vocab, pos_tags):
    """
    Preprocess UD_Spanish-AnCora dataset

    :param dataset: spanish POS tags dataset
    :param vocab: words in the training dataset
    :param pos_tags: list of possible POS tags
    :return: list of (word, POS tag)
    """
    word_tag = []
    for sentence in dataset:
        word_tag.append(("<BOS>", "<BOS>"))
        for word, tag in (zip(sentence["tokens"], sentence["upos_tags"])):
            if word not in vocab:
                word = "<UNK>"
            word_tag.append((word, pos_tags[tag]))
        word_tag.append(("<EOS>", "<EOS>"))

    return word_tag

In [None]:
# Import datasets
train_dataset = load_dataset("PlanTL-GOB-ES/UD_Spanish-AnCora", split="train")
test_dataset = load_dataset("PlanTL-GOB-ES/UD_Spanish-AnCora", split="test")

# Create vocabulary
vocab_counts, vocab_index, pos_tags = create_vocab(train_dataset)
vocab_counts, vocab_index, pos_tags

Downloading builder script:   0%|          | 0.00/15.9k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/5.99k [00:00<?, ?B/s]

Downloading and preparing dataset ud_spanish-an_cora/es_ancora to /root/.cache/huggingface/datasets/PlanTL-GOB-ES___ud_spanish-an_cora/es_ancora/2.7.0/3a97415c22e8c57cbe92c71302b77d8563dfb78df651b06ee8744381b2f2b4e8...


Downloading data files:   0%|          | 0/3 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/42.5M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/5.08M [00:00<?, ?B/s]

Extracting data files:   0%|          | 0/3 [00:00<?, ?it/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

Dataset ud_spanish-an_cora downloaded and prepared to /root/.cache/huggingface/datasets/PlanTL-GOB-ES___ud_spanish-an_cora/es_ancora/2.7.0/3a97415c22e8c57cbe92c71302b77d8563dfb78df651b06ee8744381b2f2b4e8. Subsequent calls will reuse this data.




['NOUN', 'PUNCT', 'ADP', 'NUM', 'SYM', 'SCONJ', 'ADJ', 'PART', 'DET', 'CCONJ', 'PROPN', 'PRON', 'X', '_', 'ADV', 'INTJ', 'VERB', 'AUX']
['NOUN', 'PUNCT', 'ADP', 'NUM', 'SYM', 'SCONJ', 'ADJ', 'PART', 'DET', 'CCONJ', 'PROPN', 'PRON', 'X', '_', 'ADV', 'INTJ', 'VERB', 'AUX', '<BOS>', '<EOS>']


({('<UNK>', 12): 21168,
  ('!', 1): 61,
  ('"', 1): 7485,
  ('%', 4): 16,
  ('&', 10): 4,
  ("'", 1): 86,
  ('(', 1): 1478,
  (')', 1): 1478,
  ('+', 10): 6,
  (',', 1): 24375,
  ('-', 1): 2304,
  ('.', 1): 14161,
  ('...', 1): 78,
  ('/', 1): 7,
  ('0', 3): 7,
  ('0,15', 3): 2,
  ('0,2', 3): 2,
  ('0,2%', 4): 2,
  ('0,3', 3): 3,
  ('0,3%', 4): 2,
  ('0,4', 3): 2,
  ('0,5', 3): 5,
  ('0,74', 3): 2,
  ('0,80%', 4): 2,
  ('0-0', 3): 7,
  ('0-1', 3): 6,
  ('0-2', 3): 2,
  ('0-3', 3): 3,
  ('0.', 3): 2,
  ('061', 3): 2,
  ('08.00', 3): 2,
  ('1', 3): 40,
  ('1%', 4): 3,
  ('1,15', 3): 2,
  ('1,2', 3): 4,
  ('1,26', 3): 3,
  ('1,28', 3): 2,
  ('1,3', 3): 6,
  ('1,4', 3): 2,
  ('1,4%', 4): 2,
  ('1,5', 3): 7,
  ('1,5%', 4): 3,
  ('1,6', 3): 4,
  ('1,7', 3): 3,
  ('1,8', 3): 7,
  ('1,8%', 4): 2,
  ('1,9', 3): 3,
  ('1-0', 3): 15,
  ('1-1', 3): 6,
  ('1-2', 3): 5,
  ('1.000', 3): 14,
  ('1.050', 3): 3,
  ('1.100', 3): 2,
  ('1.115.856', 3): 2,
  ('1.200', 3): 7,
  ('1.223.112', 3): 4,
  ('1.29

In [None]:
# Process train and test data
train_word_tag = preprocess_dataset(train_dataset, vocab_index, pos_tags)
test_word_tag = preprocess_dataset(test_dataset, vocab_index, pos_tags)
train_word_tag[:10]

[('<BOS>', '<BOS>'),
 ('Las', 'DET'),
 ('reservas', 'NOUN'),
 ('de', 'ADP'),
 ('oro', 'NOUN'),
 ('y', 'CCONJ'),
 ('divisas', 'NOUN'),
 ('de', 'ADP'),
 ('Rusia', 'PROPN'),
 ('subieron', 'VERB')]

# Hidden Markov Model

In [None]:
class HMM:

    def __init__(self):
        self.vocab_counts = None
        self.vocab_index = None
        self.pos_tags = None

        # Dictionary of (prev_tag, tag) : counts --> counts the amount of times that each tag pair appears
        self.transition_counts = defaultdict(int)
        # Dictionary of (tag, word) : counts --> counts the amount of times that each tag-word appears
        self.emission_counts = defaultdict(int)
        # Dictionary of (tag): counts --> counts the amount of times that a certain tag appears
        self.tag_counts = defaultdict(int)

        # Matrix with probability of a POS tag given another POS tag (from previous word)
        self.transition_matrix = None
        # Matrix with probability of a word given its POS tag
        self.emission_matrix = None

        self.best_tagseq_probabilities = None
        self.best_paths = None

    def populate_state_dictionaries(self, word_tag):
        """
        Populate transitions, emission and state count dictionaries
        :param word_tag: list of (word, pos_tag) tuples
        """
        # Initialize previous tag with the beginning of sentence state
        prev_tag = "<EOS>"
        self.tag_counts[prev_tag] += 1

        # For each word, tag pair
        for word, tag in word_tag:
            # Increase transition, emission and tag counts
            self.transition_counts[(prev_tag, tag)] += 1
            self.emission_counts[(tag, word)] += 1
            self.tag_counts[tag] += 1

            # Update prev_tag with current tag for next iteration
            prev_tag = tag

    def generate_transition_matrix(self, alpha=0.001):
        """
        Compute matrix with probability of a POS tag (hidden state) given another POS tag (previous hidden state).

        :param alpha: smoothing parameter
        """

        print(self.tag_counts.keys())
        tags_list = sorted(self.tag_counts.keys())
        print(tags_list)
        num_tags = len(tags_list)

        # Initialize transition_matrix
        self.transition_matrix = np.zeros((num_tags, num_tags))

        # For each row of the matrix
        for prev_tag_idx in range(num_tags):

            # For each column of the row
            for tag_idx in range(num_tags):

                count = 0
                key = (tags_list[prev_tag_idx], tags_list[tag_idx])
                # If transition prev_tag -> tag exists in training data, get its total count
                if key in self.transition_counts:
                    count = self.transition_counts[key]

                # Get amount of times that the previous tag appears
                count_prev_tag = self.tag_counts[tags_list[prev_tag_idx]]

                # Update transition matrix with P(tag | prev_tag)
                self.transition_matrix[prev_tag_idx, tag_idx] = (count + alpha) / (count_prev_tag + alpha * num_tags)

    def generate_emission_matrix(self, alpha=0.001):
        """
        Compute matrix with probability of a word (observed event) given its POS tag (hidden state)

        :param alpha: smoothing parameter
        """

        tags_list = sorted(self.tag_counts.keys())
        num_tags = len(tags_list)

        words_list = list(self.vocab_index)
        num_words = len(words_list)

        # Initialize emission matrix
        self.emission_matrix = np.zeros((num_tags, num_words))

        # For each row of the matrix
        for tag_idx in range(num_tags):

            # For each column of the row
            for word_idx in range(num_words):

                count = 0
                key = (tags_list[tag_idx], words_list[word_idx])
                # If emission tag -> word exists in training data, get its total count
                if key in self.emission_counts:
                    count = self.emission_counts[key]

                # Get amount of times that the tag appears
                count_tag = self.tag_counts[tags_list[tag_idx]]

                # Update emission matrix with P(word | tag)
                self.emission_matrix[tag_idx, word_idx] = (count + alpha) / (count_tag + alpha * num_words)

    def train(self, word_tag, vocab_counts, vocab_index, pos_tags):
        self.vocab_counts, self.vocab_index, self.pos_tags = vocab_counts, vocab_index, sorted(pos_tags)
        self.populate_state_dictionaries(word_tag)
        self.generate_transition_matrix()
        self.generate_emission_matrix()

    def predict(self, words):
        """
        Predict pos tags with Viterbi optimization

        :param words: list of words
        :return: list of predicted tags
        """

        # Forward pass
        self.best_tagseq_probabilities, self.best_paths = viterbi_forward(self.pos_tags,
                                                                          self.tag_counts,
                                                                          self.transition_matrix,
                                                                          self.emission_matrix,
                                                                          self.vocab_index,
                                                                          words)

        # Backward pass
        predicted_tags = viterbi_backward(self.best_tagseq_probabilities, self.best_paths, sorted(self.pos_tags), words)

        return predicted_tags

    def naive_predict(self, words):
        """
        Naïve POS tag prediction (without Viterbi optimization).
        To each word, assign POS tag with the highest emission count.

        :param words: list of words
        :return: list of predicted tags
        """
        predicted_tags = []
        for word in words:
            # print(word)
            best_tag = ""
            highest_count = 0
            if word in self.vocab_index:
                for tag in self.pos_tags:
                    count = self.emission_counts[(tag, word)]
                    if count > highest_count:
                        highest_count = count
                        best_tag = tag
            predicted_tags.append(best_tag)
        return predicted_tags

    def get_vocab(self):
        return self.vocab_index

## HMM Training

In [None]:
# Train HMM
hmm = HMM()
hmm.train(train_word_tag, vocab_counts, vocab_index, pos_tags)

dict_keys(['<EOS>', '<BOS>', 'DET', 'NOUN', 'ADP', 'CCONJ', 'PROPN', 'VERB', 'NUM', 'PUNCT', 'ADV', '_', 'AUX', 'PRON', 'ADJ', 'SCONJ', 'PART', 'SYM', 'INTJ', 'X'])
['<BOS>', '<EOS>', 'ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'INTJ', 'NOUN', 'NUM', 'PART', 'PRON', 'PROPN', 'PUNCT', 'SCONJ', 'SYM', 'VERB', 'X', '_']


# Viterbi Optimization

## Forward pass

In [None]:
def viterbi_forward(pos_tags, tag_counts, transition_matrix, emission_matrix, vocab, words):

    num_tags = len(tag_counts)

    # Initialize matrices
    state_prob_matrix = np.zeros((num_tags, len(words)))
    bos_idx = pos_tags.index("<BOS>")
    state_prob_matrix[bos_idx, 0] = 1

    backtrack_matrix = np.zeros((num_tags, len(words)), dtype=int)
    eos_idx = pos_tags.index("<EOS>")
    backtrack_matrix[bos_idx, 0] = eos_idx

    # For each word in the sequence (word 0 already initialized)
    for word_idx in range(1, len(words)):

        # For each POS tag type that this word could be
        for tag_idx in range(num_tags):

            best_prob = float("-inf")
            best_path = None

            # For each POS tag that the previous word could be:
            for prev_tag_idx in range(num_tags):

                # compute the probability that the previous word had a given POS tag,
                # that the current word has a given POS tag,
                # and that the POS tag would emit this current word
                prob = state_prob_matrix[prev_tag_idx, word_idx - 1] + math.log(
                    transition_matrix[prev_tag_idx, tag_idx]) + math.log(
                    emission_matrix[tag_idx, vocab[words[word_idx]]])

                # If that probability is greater than the current best probability,
                if prob > best_prob:
                    # assign this probability as best probability
                    best_prob = prob
                    # assign the previous tag index as the best path
                    best_path = prev_tag_idx

            state_prob_matrix[tag_idx, word_idx] = best_prob
            backtrack_matrix[tag_idx, word_idx] = best_path

    return state_prob_matrix, backtrack_matrix

## Backward pass

In [None]:
def viterbi_backward(state_prob_matrix, backtrack_matrix, pos_tags, words):
    num_words = backtrack_matrix.shape[1]
    num_tags = len(pos_tags)
    pred_idx = [None] * num_words
    pred = [None] * num_words
    best_prob_last_word = float(" -inf")

    for tag_idx in range(num_tags):
        # Find the highest probability from that column
        if state_prob_matrix[tag_idx, -1] > best_prob_last_word:
            best_prob_last_word = state_prob_matrix[tag_idx, -1]
            pred_idx[num_words - 1] = tag_idx

    pred[num_words - 1] = pos_tags[tag_idx]

    # Iterate backwards through the words. For each word:
    for word_idx in range(num_words - 1, -1, -1):
        # Get the tag index with the highest probability in that column
        tag_idx = np.argmax(state_prob_matrix[:, word_idx])
        pos_tag = backtrack_matrix[tag_idx, word_idx]

        # Get the previous word's tag index
        pred_idx[word_idx - 1] = backtrack_matrix[pos_tag, word_idx]

        # Get the previous word's tag
        pred[word_idx - 1] = pos_tags[pos_tag]

    return pred

# Test Model

In [None]:
def load_test_data(test_data):
    words = [word for word, _ in test_data]
    gold_tags = [tag for _, tag in test_data]
    return words, gold_tags

In [None]:
def evaluate_accuracy(words, pred_y, gold_y):
    correct = 0
    total = 0
    for word, pred, gold in zip(words, pred_y, gold_y):
        if word not in ["<BOS>", "<EOS>"]:
            total += 1
            if pred == gold:
                correct += 1
    return correct / total

In [None]:
# Load test data
words, gold_tags = load_test_data(test_word_tag)
words[40:55], gold_tags[40:55]

(['le',
  'acusó',
  'en',
  'una',
  'carta',
  'abierta',
  'de',
  'utilizar',
  'métodos',
  'poco',
  'democráticos',
  'de',
  'gobierno',
  '.',
  '<EOS>'],
 ['PRON',
  'VERB',
  'ADP',
  'DET',
  'NOUN',
  'ADJ',
  'ADP',
  'VERB',
  'NOUN',
  'ADV',
  'ADJ',
  'ADP',
  'NOUN',
  'PUNCT',
  '<EOS>'])

In [None]:
# Naive prediction
naive_pred_tags = hmm.naive_predict(words)
# Evaluate accuracy of naive predictions
naive_acc = evaluate_accuracy(words, naive_pred_tags, gold_tags)
print("Naive accuracy:", naive_acc)

Naive accuracy: 0.9009765660126252


In [None]:
# Viterbi optimized prediction
viterbi_pred_tags = hmm.predict(words)
# Evaluate accuracy of optimized predictions
viterbi_acc = evaluate_accuracy(words, viterbi_pred_tags, gold_tags)
print("Viterbi accuracy:", viterbi_acc)

Viterbi accuracy: 0.9470891858352967


# Main

In [None]:
class POS_Tagger:

    def __init__(self, hmm):
        self.nlp = Spanish()
        self.nlp.add_pipe("sentencizer")
        self.pos_tagger = hmm

    def preprocess_input(self, text):
        doc = self.nlp(text)
        words = []
        for sentence in doc.sents:
            words.append("<BOS>")
            for token in sentence:
                word = token.text
                if word not in self.pos_tagger.get_vocab():
                    word = "<UNK>"
                words.append(word)
            words.append("<EOS>")
        return words

    def tag(self, words):
        tags = self.pos_tagger.predict(words)
        return tags

In [None]:
pos_tagger = POS_Tagger(hmm)
text = input("Please, insert your sentence in Spanish ('Q' to quit): ")
while text != "Q":
    words = pos_tagger.preprocess_input(text)
    tags = pos_tagger.tag(words)

    print("{: >20} {: >20}".format("Word",  "Tag"))
    for word, tag in zip(words, tags):
        if word not in ["<BOS>", "<EOS>"]:
            print("{: >20} {: >20}".format(word, tag))

    print()
    text = input("Please, insert your sentence in Spanish ('Q' to quit): ")

Please, insert your sentence in Spanish ('Q' to quit): Más de mil trabajadores se manifestaron ayer por la tarde. Piden una mejora de sus condiciones laborales.
                Word                  Tag
                 Más                  ADV
                  de                  ADP
                 mil                  NUM
        trabajadores                 NOUN
                  se                 PRON
        manifestaron                 VERB
                ayer                  ADV
                 por                  ADP
                  la                  DET
               tarde                 NOUN
                   .                PUNCT
               Piden                 VERB
                 una                  DET
              mejora                 NOUN
                  de                  ADP
                 sus                  DET
         condiciones                 NOUN
           laborales                  ADJ
                   .                PUNCT