In [56]:
import datasets
import numpy as np


In [57]:
dataset = datasets.load_dataset("ktgiahieu/maccrobat2018_2020")
dataset = dataset['train'].train_test_split(test_size=0.2)
validation_ds = dataset.pop('test')
dataset['validation'] = validation_ds
dataset.shape

{'train': (320, 2), 'validation': (80, 2)}

In [58]:
# Extract tokens and tags
tokens = [item['tokens'] for item in dataset['train']]
tags = [item['tags'] for item in dataset['train']]

In [59]:
# Define states and observations
state_set = set(tag for doc in tags for tag in doc)
observation_set = set(token for doc in tokens for token in doc)

In [60]:
len(state_set)

82

In [61]:
# Initialize probability matrices
transition_probabilities = {state: {state2: 0 for state2 in state_set} for state in state_set}
emission_probabilities = {state: {observation: 0 for observation in observation_set} for state in state_set}
initial_state_probabilities = {state: 0 for state in state_set} 

In [62]:
for token_list, tag_list in zip(tokens, tags):
    previous_tag = None
    for token, tag in zip(token_list, tag_list):
        emission_probabilities[tag][token] += 1
        if previous_tag is not None:
            transition_probabilities[previous_tag][tag] += 1
        else:
            initial_state_probabilities[tag] += 1
        previous_tag = tag

In [63]:
def normalize_probabilities(transition_probabilities, emission_probabilities, initial_state_probabilities):
    # Normalize transition probabilities
    for state, transitions in transition_probabilities.items():
        total = sum(transitions.values())
        if total > 0:
            for state2 in transitions:
                transition_probabilities[state][state2] /= total

    # Normalize emission probabilities
    for state, emissions in emission_probabilities.items():
        total = sum(emissions.values())
        if total > 0:
            for observation in emissions:
                emission_probabilities[state][observation] /= total

    # Normalize initial state probabilities
    total = sum(initial_state_probabilities.values())
    if total > 0:
        for state in initial_state_probabilities:
            initial_state_probabilities[state] /= total

    return transition_probabilities, emission_probabilities, initial_state_probabilities


In [64]:
transition_probabilities, emission_probabilities, initial_state_probabilities = normalize_probabilities(transition_probabilities, emission_probabilities, initial_state_probabilities)

In [65]:
print(len(transition_probabilities))
print(len(emission_probabilities))
print(len(initial_state_probabilities))

82
82
82


In [66]:
def viterbi_algorithm(observations, states, start_prob, trans_prob, emit_prob):
    V = [{}]
    path = {}

    for state in states:
        V[0][state] = start_prob.get(state, 0) * emit_prob[state].get(observations[0], 0)
        path[state] = [state]

    for t in range(1, len(observations)):
        V.append({})
        new_path = {}

        for current_state in states:
            prob, prev_st = max(
                (V[t-1][prev_state] * trans_prob[prev_state].get(current_state, 0) * emit_prob[current_state].get(observations[t], 0), prev_state) 
                for prev_state in states
            )

            V[t][current_state] = prob
            new_path[current_state] = path[prev_st] + [current_state]

        path = new_path

    n = len(observations) - 1
    prob, max_final_state = max((V[n][state], state) for state in states)
    return prob, path[max_final_state]

In [76]:
def evaluate(dataset, states, initial_state_probabilities, transition_probabilities, emission_probabilities):
    tokens = [item['tokens'] for item in dataset['validation']]
    tags = [item['tags'] for item in dataset['validation']]

    correct = 0
    total = 0

    for token_sequence, true_tag_sequence in zip(tokens, tags):
        _, predicted_tag_sequence = viterbi_algorithm(token_sequence, states, initial_state_probabilities, transition_probabilities, emission_probabilities)

        total += len(true_tag_sequence)
        correct += sum(p_tag == t_tag for p_tag, t_tag in zip(predicted_tag_sequence, true_tag_sequence))

    accuracy = correct / total if total > 0 else 0

    return accuracy

In [115]:
test = ["The", "64", "year", "old", "patient", "received", "medication", "."]

_, predicted_tag_sequence = viterbi_algorithm(test, list(state_set), initial_state_probabilities, transition_probabilities, emission_probabilities)

predicted_tag_sequence

['O', 'B-Age', 'I-Age', 'I-Age', 'O', 'O', 'O', 'O']

In [77]:
accuracy = evaluate(dataset, list(state_set), initial_state_probabilities, transition_probabilities, emission_probabilities)

In [79]:
print(accuracy)

0.7850949850476538


In [84]:
import math
from collections import Counter

def calculate_entropy(tags):
    # Flatten the list of tags to a single list
    all_tags = [tag for seq in tags for tag in seq]
    
    # Count the frequency of each tag
    tag_counts = Counter(all_tags)
    
    # Total number of tags
    total_tags = len(all_tags)
    
    # Calculate entropy
    entropy = -sum((count/total_tags) * math.log(count/total_tags, 2) for count in tag_counts.values())
    
    return entropy

# Example usage
tags = [item['tags'] for item in dataset['train']]
entropy = calculate_entropy(tags)
print("Entropy of tag sequences:", entropy)


Entropy of tag sequences: 2.125497866231477
