In [1]:
# ============================================
# Arabic NER with HMM + Viterbi (WikiANN)
# ============================================

!pip install -q datasets seqeval

import math
from collections import Counter, defaultdict
from datasets import load_dataset
from seqeval.metrics import f1_score, classification_report

UNK = "<UNK>"
SMOOTHING = 1.0  # Laplace smoothing


# -----------------------------
# 1) Load WikiANN Arabic NER
# -----------------------------
dataset = load_dataset("wikiann", "ar")

train_ds = dataset["train"]
val_ds   = dataset["validation"]
test_ds  = dataset["test"]

print("Full sizes:")
print("Train:", len(train_ds))
print("Val:  ", len(val_ds))
print("Test: ", len(test_ds))

# (Optional) Subsample to speed up HMM training
max_train = min(len(train_ds), 50000)
max_val   = min(len(val_ds),   5000)
max_test  = min(len(test_ds),  5000)

train_ds = train_ds.select(range(max_train))
val_ds   = val_ds.select(range(max_val))
test_ds  = test_ds.select(range(max_test))

print("\nUsing subsets:")
print("Train:", len(train_ds))
print("Val:  ", len(val_ds))
print("Test: ", len(test_ds))

# label id -> name, e.g. 'O', 'B-PER', 'I-PER', ...
label_names = train_ds.features["ner_tags"].feature.names
print("\nNER labels:", label_names)


# -----------------------------
# 2) Convert HF dataset to sentences
# -----------------------------
def hf_to_sentences(ds):
    """
    ds: HF split with 'tokens' and 'ner_tags'
    Returns list of sentences:
      each sentence = list of (word, label_str)
    """
    sents = []
    for ex in ds:
        tokens = ex["tokens"]
        tag_ids = ex["ner_tags"]
        tags = [label_names[i] for i in tag_ids]
        sents.append(list(zip(tokens, tags)))
    return sents

train_sents_raw = hf_to_sentences(train_ds)
val_sents_raw   = hf_to_sentences(val_ds)
test_sents_raw  = hf_to_sentences(test_ds)

print("\n#train sentences:", len(train_sents_raw))
print("#val sentences:  ", len(val_sents_raw))
print("#test sentences: ", len(test_sents_raw))


# -----------------------------
# 3) Build vocabulary from train
# -----------------------------
def build_vocab(sentences, min_freq=2):
    freq = Counter()
    for sent in sentences:
        for word, label in sent:
            freq[word] += 1
    vocab = {w for w, c in freq.items() if c >= min_freq}
    return vocab, freq

vocab, word_freq = build_vocab(train_sents_raw, min_freq=2)
print("\nVocab size (freq>=2):", len(vocab))


def replace_rare_with_unk(sentences, vocab):
    """
    Replace words not in vocab with UNK.
    """
    new_sents = []
    for sent in sentences:
        new_sent = []
        for word, label in sent:
            if word not in vocab:
                new_sent.append((UNK, label))
            else:
                new_sent.append((word, label))
        new_sents.append(new_sent)
    return new_sents

train_sents = replace_rare_with_unk(train_sents_raw, vocab)
val_sents   = replace_rare_with_unk(val_sents_raw, vocab)
test_sents  = replace_rare_with_unk(test_sents_raw, vocab)


# -----------------------------
# 4) HMM NER Tagger
# -----------------------------
class HMMNERTagger:
    def __init__(self, smoothing=1.0):
        self.smoothing = smoothing
        self.tags = []
        self.tag_to_idx = {}
        self.idx_to_tag = {}
        self.vocab = set()

        # counts
        self.tag_counts = Counter()
        self.initial_tag_counts = Counter()
        self.transition_counts = defaultdict(Counter)  # prev_tag -> Counter(next_tag)
        self.emission_counts   = defaultdict(Counter)  # tag -> Counter(word)

        # log probabilities
        self.log_initial = {}
        self.log_transition = defaultdict(dict)
        self.log_emission = defaultdict(dict)

    def fit(self, sentences):
        """
        sentences: list of sentences, each is list of (word, label)
        """
        # first pass: collect counts
        for sent in sentences:
            if not sent:
                continue

            first_tag = sent[0][1]
            self.initial_tag_counts[first_tag] += 1

            prev_tag = None
            for word, tag in sent:
                self.tag_counts[tag] += 1
                self.emission_counts[tag][word] += 1
                self.vocab.add(word)

                if prev_tag is not None:
                    self.transition_counts[prev_tag][tag] += 1
                prev_tag = tag

        # unique tags
        self.tags = sorted(self.tag_counts.keys())
        self.tag_to_idx = {t: i for i, t in enumerate(self.tags)}
        self.idx_to_tag = {i: t for t, i in self.tag_to_idx.items()}

        num_tags = len(self.tags)
        vocab_size = len(self.vocab) + 1  # +UNK

        # initial probabilities
        total_init = sum(self.initial_tag_counts.values())
        for tag in self.tags:
            count = self.initial_tag_counts[tag]
            prob = (count + self.smoothing) / (total_init + self.smoothing * num_tags)
            self.log_initial[tag] = math.log(prob)

        # transition probabilities
        for prev_tag in self.tags:
            total_prev = sum(self.transition_counts[prev_tag].values())
            for next_tag in self.tags:
                count = self.transition_counts[prev_tag][next_tag]
                prob = (count + self.smoothing) / (total_prev + self.smoothing * num_tags)
                self.log_transition[prev_tag][next_tag] = math.log(prob)

        # emission probabilities
        for tag in self.tags:
            total_tag = sum(self.emission_counts[tag].values())
            for word in self.vocab:
                count = self.emission_counts[tag][word]
                prob = (count + self.smoothing) / (total_tag + self.smoothing * vocab_size)
                self.log_emission[tag][word] = math.log(prob)

            # UNK emission
            unk_count = 0
            prob_unk = (unk_count + self.smoothing) / (total_tag + self.smoothing * vocab_size)
            self.log_emission[tag][UNK] = math.log(prob_unk)

    def viterbi(self, words):
        """
        words: list of tokens (already UNK-handled if needed)
        returns: list of predicted tags
        """
        T = len(words)
        N = len(self.tags)

        # map OOV to UNK
        obs = [w if w in self.vocab else UNK for w in words]

        dp = [[-math.inf] * N for _ in range(T)]
        backpointer = [[None] * N for _ in range(T)]

        # init
        for i, tag in enumerate(self.tags):
            log_init = self.log_initial.get(tag, -math.inf)
            log_emit = self.log_emission[tag].get(obs[0], -math.inf)
            dp[0][i] = log_init + log_emit
            backpointer[0][i] = None

        # recursion
        for t in range(1, T):
            for i, curr_tag in enumerate(self.tags):
                best_score = -math.inf
                best_prev = None
                log_emit = self.log_emission[curr_tag].get(obs[t], -math.inf)

                for j, prev_tag in enumerate(self.tags):
                    log_trans = self.log_transition[prev_tag].get(curr_tag, -math.inf)
                    score = dp[t-1][j] + log_trans + log_emit
                    if score > best_score:
                        best_score = score
                        best_prev = j

                dp[t][i] = best_score
                backpointer[t][i] = best_prev

        # termination
        best_last_score = -math.inf
        best_last_idx = None
        for i in range(N):
            if dp[T-1][i] > best_last_score:
                best_last_score = dp[T-1][i]
                best_last_idx = i

        # backtrack
        best_path_idx = [best_last_idx]
        for t in range(T-1, 0, -1):
            best_prev = backpointer[t][best_path_idx[-1]]
            best_path_idx.append(best_prev)
        best_path_idx.reverse()

        best_tags = [self.idx_to_tag[i] for i in best_path_idx]
        return best_tags

    def predict_sentence(self, tokens):
        """
        tokens: list of original tokens (with UNK handling)
        """
        words = [w if w in self.vocab else UNK for w in tokens]
        return self.viterbi(words)

    def evaluate(self, sentences):
        """
        sentences: list of sentences, each is list of (word, gold_label)
        returns: F1 score (entity-level)
        """
        y_true = []
        y_pred = []
        for sent in sentences:
            tokens = [w for (w, _) in sent]
            gold   = [l for (_, l) in sent]
            pred   = self.predict_sentence(tokens)
            y_true.append(gold)
            y_pred.append(pred)

        f1 = f1_score(y_true, y_pred)
        print("\nClassification report:")
        print(classification_report(y_true, y_pred))
        return f1


# -----------------------------
# 5) Train HMM NER
# -----------------------------
print("\nTraining HMM NER model...")
hmm_ner = HMMNERTagger(smoothing=SMOOTHING)
hmm_ner.fit(train_sents)
print("Training done.")


# -----------------------------
# 6) Evaluate on validation + test
# -----------------------------
print("\n== Validation performance ==")
val_f1 = hmm_ner.evaluate(val_sents)
print("Validation F1:", val_f1)

print("\n== Test performance ==")
test_f1 = hmm_ner.evaluate(test_sents)
print("Test F1:", test_f1)


# -----------------------------
# 7) Predict on a custom sentence
# -----------------------------
def predict_sentence_hmm(tokens):
    """
    tokens: list of Arabic words
    """
    pred = hmm_ner.predict_sentence(tokens)
    return list(zip(tokens, pred))

example = ["زار", "الوزير", "العاصمة", "دمشق", "أمس", "."]
print("\nExample prediction on custom sentence:")
print(predict_sentence_hmm(example))


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for seqeval (setup.py) ... [?25l[?25hdone


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

ar/validation-00000-of-00001.parquet:   0%|          | 0.00/643k [00:00<?, ?B/s]

ar/test-00000-of-00001.parquet:   0%|          | 0.00/648k [00:00<?, ?B/s]

ar/train-00000-of-00001.parquet:   0%|          | 0.00/1.29M [00:00<?, ?B/s]

Generating validation split:   0%|          | 0/10000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/10000 [00:00<?, ? examples/s]

Generating train split:   0%|          | 0/20000 [00:00<?, ? examples/s]

Full sizes:
Train: 20000
Val:   10000
Test:  10000

Using subsets:
Train: 20000
Val:   5000
Test:  5000

NER labels: ['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']

#train sentences: 20000
#val sentences:   5000
#test sentences:  5000

Vocab size (freq>=2): 8992

Training HMM NER model...
Training done.

== Validation performance ==

Classification report:
              precision    recall  f1-score   support

         LOC       0.73      0.62      0.67      1973
         ORG       0.69      0.68      0.69      1790
         PER       0.68      0.68      0.68      1897

   micro avg       0.70      0.66      0.68      5660
   macro avg       0.70      0.66      0.68      5660
weighted avg       0.70      0.66      0.68      5660

Validation F1: 0.6793344849531775

== Test performance ==

Classification report:
              precision    recall  f1-score   support

         LOC       0.73      0.64      0.68      1896
         ORG       0.70      0.66      0.68      1836
  