In [1]:
# --- Before your go ----
# 1. Rename Assignment-03-###.ipynb where ### is your student ID.
# 2. The deadline of Assignment-03 is 23:59pm, 06-05-2024


# --- Explore HMM POS Taggers using Brown corpus ---
# In this assignment, you will explore three taggers for a Brown corpus.
# import your packages here

In [2]:

# Task 1 --- Load and explore your data ---
# 1). load train/test samples from Brown corpus files, brown-train.txt, brown-test.txt.
# 2). load all 12 tags from brown-tag.txt and print it out
# 3). counting how many sentences and words in both train and test datasets.
# 4). for each tag, counting how many words in train and test. e.g, tag1: [count_tr, count_te]

# Your code

### 1.1 load train/test samples

In [3]:
from typing import List, Dict, Tuple

def open_brown_files(type: str) -> Tuple[List]:
    """
    Open the Brown corpus files based on the given type.
    
    Args:
    - type: The type of the Brown corpus, should be in ['train', 'test', 'tag'].
    
    Return:
    - content: The content of the corpus.
    """
    assert type in ['train', 'test', 'tag']
    
    # open the corpus file and read header/content from it
    with open("brown-{}.txt".format(type), 'r', encoding='utf-8') as f:
        content = [line.strip() for line in f.readlines()]
            
    return content
        
def sentence_head_end(line: str) -> bool:
    """
    Return True if the line is the head or the end of a sentence.
    """
    return line.startswith("b100-") or line == ''

train_content = open_brown_files('train')
test_content = open_brown_files('test')

### 1.2 load all tags

In [4]:
tag_content = open_brown_files('tag')
print(tag_content)

['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X']


### 1.3 count sentences and words

In [5]:
INTERVAL = " " * 2 + "|" + " " * 2

def count_sentence_word(content: List[str]) -> Tuple[int]:
    """
    Count the number of sentences and words in the given content.
    
    Args:
    - content: A list of string returned by the function `open_brown_files`.
    
    Return:
    - num_sentence: The number of sentences in the given content.
    - num_word: The number of words in the given content.
    """
    num_sentence, num_word = 0, 0
    
    for line in content:
        # if the line startwith "b100-", then it's the start of a sentence
        if line.startswith('b100-'):
            num_sentence += 1
            continue
        # if line is empty, then the we reach the end of a sentence
        if line == '':
            continue
        # if the tag is not '.', then the text in the line is a word
        tag = line.split('\t')[1]
        if tag != '.':
            num_word += 1
    return num_sentence, num_word
    
train_sentence, train_word = count_sentence_word(train_content)
test_sentence, test_word = count_sentence_word(test_content)

# print the results
print("type " + INTERVAL + "sentences" + INTERVAL + " words")
print("=" * 30)
print("train" + INTERVAL + f"{train_sentence:^9}" + INTERVAL + f"{train_word:^5}") 
print("test " + INTERVAL + f"{test_sentence:^9}" + INTERVAL + f"{test_word:^5}") 

type   |  sentences  |   words
train  |    45800    |  810604
test   |    11540    |  203023


### 1.4 count frequency of each tag

From the result we can find that the distribution of different tags are similar in train and test corpus.

In [6]:
def count_frequency_tag(content: List[str]) -> Dict:
    """
    Count the ocurr frequency of each tag in the given content.
    
    Args:
    - content: A list of string returned by the function `open_brown_files`.
    
    Return:
    - tag_count: A dict containing the information of frequency of each tag.
    """
    tag_count = {tag: 0 for tag in tag_content}
    
    for line in content:
        if sentence_head_end(line):
            continue
        curr_tag = line.split('\t')[1]
        tag_count[curr_tag] += 1
        
    return tag_count

train_tag_freq = count_frequency_tag(train_content)
test_tag_freq = count_frequency_tag(test_content)

print(" Tag" + " " * 2 + "[count_tr, count_te]")
print("=" * 26)
for tag in tag_content:
    print("{:>4}: [{:>8}, {:>8}]".format(tag, train_tag_freq[tag], test_tag_freq[tag]))


 Tag  [count_tr, count_te]
   .: [  117723,    29842]
 ADJ: [   66985,    16736]
 ADP: [  115752,    29014]
 ADV: [   44765,    11474]
CONJ: [   30455,     7696]
 DET: [  109418,    27601]
NOUN: [  220451,    55107]
 NUM: [   11921,     2953]
PRON: [   39657,     9677]
 PRT: [   23889,     5940]
VERB: [  146199,    36551]
   X: [    1112,      274]


In [7]:
# Task 2 --- Method 1: Build a baseline method, namely, the most frequent tagger ---
#     If you can recall, we introduced a strong baseline method (See Dan's book in 
# https://web.stanford.edu/~jurafsky/slp3/ed3book_jan72023.pdf Page 164.),
#     where we label each word by using the most frequent-used tag associated with it.
# 1). find the most frequent class label for each word in the training data.
#     For example, {tr_word_1:tag_1,tr_word_2:tag_2,...}
# 2). use your built method to predict tags for both train and test datasets.
#     You should print out two values: the accuracies of train and test samples.
#     You would expect that the accuracy on train will be > 0.9 (but never = 1.0) and higher than on test.

# Notice: since there are unkown words in test samples. 
#  Following ways could handle this (choose one or create your own): 
#  1). mark all words that appear only once in the data with a "UNK-x" tag
#  2). tag every out-of-vocabulary word with the majority tag among all training samples.
#  3). find more methods in https://github.com/Adamouization/POS-Tagging-and-Unknown-Words

# Your code

### 2.1 Build the `MostFreqTagger` class  

In [8]:
import numpy as np

class MostFreqTagger:
        
    def __init__(self) -> None:
        self.tags = tag_content
        self.vocabulary = {}
        self._tag_matrix = None
        self.word_tag = {}
        self._appear_once_tag = 'UNK-x'
        self._most_freq_tag = None
    
    def _dict_to_matrix(self, vocabulary: Dict) -> np.array:
        """
        Convert the 2-d hierarchy dictionary of vocabulary into a 2-d numpy matrix for better
        processing in training and testing procedure of the model later.
        """
        tag_matrix = np.zeros((len(vocabulary), len(self.tags)))
        for i, word in enumerate(vocabulary):
            for j, tag in enumerate(self.tags):
                tag_matrix[i, j] = vocabulary[word][tag]
        return tag_matrix
    
    def _find_word_tag(self):
        """
        Find the most frequent class label for each word in the training data.
        """
        tag_indices = np.argmax(self._tag_matrix, axis=1)
        self.word_tag = {
            vocab: self.tags[tag_indices[i]] for i, vocab in enumerate(self.vocabulary.keys())
        }
        
    def _mark_appear_once(self):
        """
        Mark all words that appear only once in the data with a "UNK-x" tag
        """
        words_count = np.sum(self._tag_matrix, axis=1)
        for i, word in enumerate(self.vocabulary):
            if words_count[i] == 1:
                self.word_tag[word] = self._appear_once_tag
        
    def _tag_out_of_vob(self):
        """
        Tag every out-of-vocabulary word with the majority tag among all training samples
        """
        labels_count = np.sum(self._tag_matrix, axis=0)
        most_label_index = np.argmax(labels_count)
        self._most_freq_tag = self.tags[most_label_index]
        
    def train(self, content: List[str]) -> float:
        """
        Count the frequency of each (word, tag) pair for the given training content.
        
        Args:
        - content: A list of string returned by the function `open_brown_files`.
        
        Return:
        - accuracy: The training accuracy of the model.
        """
        for line in content:
            if sentence_head_end(line):
                continue
            word, tag = line.split('\t')
            # if the word first appears, initialize the dictionary of tags for this word
            if word not in self.vocabulary:
                self.vocabulary[word] = {
                    tag: 0 for tag in self.tags
                }
            # increment the frequency by 1 of the (word, tag) pair
            self.vocabulary[word][tag] += 1
        # convert dictionary into a matrix
        self._tag_matrix = self._dict_to_matrix(self.vocabulary)
        # find the most frequent label for each word
        self._find_word_tag()
        # handle unknown words
        self._mark_appear_once()
        self._tag_out_of_vob()
        # return the train accuracy
        return 1 - ((np.sum(self._tag_matrix, axis=1) == 1).sum() / np.sum(self._tag_matrix))
    
    def test(self, content: List[str]) -> float:
        """
        Count the frequency of each (word, tag) pair for the given testing content.
        
        Args:
        - content: A list of string returned by the function `open_brown_files`.
        
        Return:
        - accuracy: The testing accuracy of the model.
        """
        # init the variables
        test_vocabulary = {}
        # iterate the content
        for line in content:
            if sentence_head_end(line):
                continue
            word, tag = line.split('\t')
            # if the word first appears, initialize the dictionary of tags for this word
            if word not in test_vocabulary:
                test_vocabulary[word] = {
                    tag: 0 for tag in self.tags
                }
            # increment the frequency by 1 of the (word, tag) pair
            test_vocabulary[word][tag] += 1
        # convert the dict to a 2-d matrix
        test_tag_matrix = self._dict_to_matrix(test_vocabulary)
        # compute the accuracy
        num_words, num_correct = 0, 0
        for i, word in enumerate(test_vocabulary):
            
            curr_word_count = np.sum(test_tag_matrix[i])
            num_words += curr_word_count
            
            # NOTE: if the current word appears only once and are out-of-vocabulary at the same time, the tag will
            #       be predicted as 'UNK-x'.
            # appear only once
            if curr_word_count == 1:
                predicted_tag = self._appear_once_tag
            # out-of-vocabulary word
            elif word not in self.vocabulary:
                predicted_tag = self._most_freq_tag
            # other case
            else:
                predicted_tag = self.word_tag[word]
                
            if predicted_tag == self._appear_once_tag:
                continue
            else:
                num_correct +=  test_vocabulary[word][predicted_tag]
                
        # return the accuracy
        return num_correct / num_words
        

### 2.2 Train & find the most frequent class label for each word in the training data

In [9]:
most_freq_tagger = MostFreqTagger()
# train
train_accuracy = most_freq_tagger.train(train_content)
# print the (word, most_freq_tag) pairs
print("{:^20}{}{:^5}".format("Word", INTERVAL, "tag"))
print("=" * 30)
for word, tag in most_freq_tagger.word_tag.items():
    print("{:^20}{}{:>5}".format(word, INTERVAL, tag)) 

        Word          |   tag 
        Mr.           |   NOUN
       Podger         |   NOUN
        had           |   VERB
      thanked         |   VERB
        him           |   PRON
      gravely         |    ADV
         ,            |      .
        and           |   CONJ
        now           |    ADV
         he           |   PRON
        made          |   VERB
        use           |   NOUN
         of           |    ADP
        the           |    DET
       advice         |   NOUN
         .            |      .
        But           |   CONJ
       there          |    PRT
       seemed         |   VERB
         to           |    PRT
         be           |   VERB
        some          |    DET
     difference       |   NOUN
      opinion         |   NOUN
         as           |    ADP
        how           |    ADV
        far           |    ADV
       board          |   NOUN
       should         |   VERB
         go           |   VERB
       whose          |    DET
        

### 2.3 Test & Report the accuracy

We can find from the accuracies below:
- The training accuracy is relatively high, which is about 97.4%
- The testing accuracy is clearly lower than the training accuracy, which is just 89.2%, slightly lower than 90%.

In [10]:
test_accuracy = most_freq_tagger.test(test_content)
print("The train accuracy of the 'most frequent tagger' model is: {:>8.6f}".format(train_accuracy))
print("The  test accuracy of the 'most frequent tagger' model is: {:>8.6f}".format(test_accuracy))

The train accuracy of the 'most frequent tagger' model is: 0.974658
The  test accuracy of the 'most frequent tagger' model is: 0.892655


In [11]:
# Task 3 --- Method 2: Build an HMM tagger ---
# 1) You should use nltk.tag.HiddenMarkovModelTagger to build an HMM tagger.
#    It has parameters: symbols, states, transitions, outputs, priors, transform (ignore it).
#    Specify these parameters properly. For example, you can use MLE to estimate transitions, outputs and priors.
#    That is, MLE to estimate matrix A (transition matrix), and matrix B (output probabilites) (See. Page 8.4.3)
# 2) After build your model, report both the accuracy of HMM tagger for train samples and test samples.
# 
# 3) Compared with your baseline method, discuss that why your HMM tagger is better/worse than baseline method.

# Notice: You may also need to handle unknown words just like Task 2.

# Your code

### 3.1 Preprocess training & testing dataset

In [12]:
from nltk.tag import hmm
from nltk.probability import MLEProbDist, ConditionalFreqDist, ConditionalProbDist, FreqDist
from collections import defaultdict, Counter

def content_to_tuple(content: List[str]) -> List[Tuple]:
    """
    Convert the content get from the files into the format of "List[Tuple]" which 
    can be used by the `hmm` model.
    """
    content_list, inner_list = [], []
    for line in content:
        if line.startswith("b100-"):
            continue
        if line == '':
            content_list.append(inner_list)
            inner_list = []
            continue
        inner_list.append(tuple(line.split('\t')))
    return content_list

train_list_tuple = content_to_tuple(train_content)
test_list_tuple = content_to_tuple(test_content)

### 3.2 Estimate parameters for `nltk.tag.HiddenMarkovModelTagger`

In [13]:
# initialize counters
transition_counts = defaultdict(Counter)
emission_counts = defaultdict(Counter)
start_counts = Counter()
tag_counts = Counter()

# collect counts from the training data
for sentence in train_list_tuple:
    prev_tag = None
    for word, tag in sentence:
        if prev_tag is None:
            start_counts[tag] += 1
        else:
            transition_counts[prev_tag][tag] += 1
        emission_counts[tag][word] += 1
        tag_counts[tag] += 1
        prev_tag = tag

# total number of sentences
num_sentences = len(train_list_tuple)

# convert counts to lists of (condition, sample) pairs
transition_pairs = [(prev_tag, tag) for prev_tag, tags in transition_counts.items() for tag, count in tags.items() for _ in range(count)]
emission_pairs = [(tag, word) for tag, words in emission_counts.items() for word, count in words.items() for _ in range(count)]

# convert to ConditionalFreqDist
transition_cfd = ConditionalFreqDist(transition_pairs)
emission_cfd = ConditionalFreqDist(emission_pairs)

# convert counts to probabilities using MLE
transition_probs = ConditionalProbDist(transition_cfd, MLEProbDist)
emission_probs = ConditionalProbDist(emission_cfd, MLEProbDist)
start_probs = MLEProbDist(FreqDist(start_counts))

# get list of symbols (words) and states (tags)
symbols = list({word for sentence in train_list_tuple for word, _ in sentence})
states = list(tag_counts.keys())


### 3.3 Build HMM model and report the accuracy

In [14]:
# build the hmm_tagger model
hmm_tagger = hmm.HiddenMarkovModelTagger(
    symbols=symbols, states=states, transitions=transition_probs, outputs=emission_probs, priors=start_probs
)

# train & evaluate accuracy on training data
hmm_tagger = hmm_tagger.train(train_list_tuple)
train_accuracy = hmm_tagger.accuracy(train_list_tuple)
print("The train accuracy of the 'hmm tagger' model is: {:>8.6f}".format(train_accuracy))

# evaluate accuracy on testing data
test_accuracy = hmm_tagger.accuracy(test_list_tuple)
print("The  test accuracy of the 'hmm tagger' model is: {:>8.6f}".format(test_accuracy))

The train accuracy of the 'hmm tagger' model is: 0.969869
The  test accuracy of the 'hmm tagger' model is: 0.951023


In [15]:
# Task 4 --- Method 3: Fine-tuning on BERT-base model for POS-tagging ---
# 
# 1) You may download a BERT model (say, you choose BERT-base cased) 
#    and use tools in https://github.com/huggingface/transformers
# 
# 2) After build your model, report both the accuracy of BERT tagger for train samples and test samples.
# 
# 3) Compared with Method 1,2, discuss that why your BERT tagger is better/worse than these two.

In [16]:
import torch
from transformers import BertTokenizer, BertForTokenClassification, Trainer, TrainingArguments
from datasets import Dataset, Features, Sequence, ClassLabel, Value
from sklearn.model_selection import train_test_split

# Prepare the data for Hugging Face's datasets library
def prepare_data(list_tuple):
    data = []
    for sentence in list_tuple:
        words, tags = zip(*sentence)
        data.append({"words": list(words), "tags": list(tags)})
    return data

train_data = prepare_data(train_list_tuple)
test_data = prepare_data(test_list_tuple)

# Define tag to index mapping
unique_tags = list(set(tag for sentence in train_list_tuple for _, tag in sentence))
tag2id = {tag: idx for idx, tag in enumerate(unique_tags)}
id2tag = {idx: tag for tag, idx in tag2id.items()}

# Define features
features = Features({
    'words': Sequence(Value('string')),
    'tags': Sequence(ClassLabel(names=unique_tags)),
})

# Convert to Dataset
train_dataset = Dataset.from_list(train_data, features=features)
test_dataset = Dataset.from_list(test_data, features=features)

# Tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples['words'], truncation=True, is_split_into_words=True
    )
    
    labels = []
    for i, label in enumerate(examples['tags']):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = []
        previous_word_idx = None
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(tag2id[label[word_idx]])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

train_tokenized = train_dataset.map(tokenize_and_align_labels, batched=True)
test_tokenized = test_dataset.map(tokenize_and_align_labels, batched=True)

# Model
model = BertForTokenClassification.from_pretrained(
    'bert-base-cased', num_labels=len(unique_tags)
)

# Training arguments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy='epoch',
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
)

# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized,
    eval_dataset=test_tokenized,
)

# Train the model
trainer.train()

# Evaluate on training data
train_metrics = trainer.evaluate(train_tokenized)
print("The train accuracy of the 'BERT model' is: {:>8.6f}".format(train_metrics['eval_accuracy']))

# Evaluate on test data
test_metrics = trainer.evaluate(test_tokenized)
print("The  test accuracy of the 'BERT model' is: {:>8.6f}".format(test_metrics['eval_accuracy']))

Map:   0%|          | 0/45800 [00:00<?, ? examples/s]

ValueError: word_ids() is not available when using non-fast tokenizers (e.g. instance of a `XxxTokenizerFast` class).