In [1]:
import os
from collections import Counter, defaultdict
from math import log

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

In [2]:
def load_conllu(path):
    sentences = []
    tokens, labels = [], []
    
    with open(path, encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            
            if not line:
                if tokens:
                    sentences.append({"tokens": tokens, "labels": labels})
                    tokens, labels = [], []
                continue
            
            #skip the comments where we say which sentence tec
            if line.startswith("#"):
                continue
            
            cols = line.split("\t")

            if "-" in cols[0] or "." in cols[0]:
                continue
            
            token = cols[1]
            ner_label = cols[-1] #NER label in the last column
            
            tokens.append(token)
            labels.append(ner_label)
    
    if tokens:
        sentences.append({"tokens": tokens, "labels": labels})
    
    return sentences

conllu_path = "C:\\Users\\timgr\\Desktop\\NLP\\NLP-Group-23\\data\\manual_annotation\\sample_sentences_labeled.conllu"  # <-- adjust path
sentences = load_conllu(conllu_path)
len(sentences)


150

Lets have a look at how are labels are distributed for now

In [3]:
print("Number of sentences:", len(sentences))

all_labels = [lab for s in sentences for lab in s["labels"]]
label_counts = Counter(all_labels)
print("Label distribution:")
for lab, c in label_counts.items():
    print(f"{lab:8s} {c:6d}")

Number of sentences: 150
Label distribution:
O          3120
B-LEG        51
I-LEG        82
B-ORG        50
I-ORG       119
B-MON        68
I-MON        11


# Train Dev split

In [4]:
train_set, dev_set = train_test_split(
    sentences, test_size=0.2, random_state=23
)

len(train_set), len(dev_set)

(120, 30)

In [5]:
def flatten_tokens(sentences):
    return [
        (tok, lab)
        for s in sentences
        for tok, lab in zip(s["tokens"], s["labels"])
    ]

train_tokens = flatten_tokens(train_set)
dev_tokens   = flatten_tokens(dev_set)

len(train_tokens), len(dev_tokens)

(2810, 691)

# Lets build a rule based NER

In [None]:
import re

class SimpleRuleNER:
    def __init__(self):
        # patterns for monetary stuff
        # e.g. 3,00  21.396  2,50%  0%  2,0
        self.mon_number_re = re.compile(
            r"^\d{1,3}(\.\d{3})*(,\d+)?$"    
            r"|^\d+(,\d+)?%$"                  
        )
        self.currency_tokens = {"€", "eur", "eur.", "euro", "euro."}

        # legal patterns
        self.leg_starters = {"§", "Artikel", "Artikel.", "Art.", "Art"}
        self.leg_follow = {
            "Absatz", "Abs.", "Satz", "Nr.", "Nr", "Nummer",
            "(", ")", "-", "WpHG"
        }

        # organization patterns, banks etc
        self.org_keywords = {
            "bank", "bundesbank", "sparkasse", "genossenschaftsbank"
        }
        self.org_suffixes = {
            "ag", "gmbh", "kg", "kgaa", "se", "plc", "ltd", "llc", "inc.", "sarl"
        }

    # ------- helpers --------

    def _is_numeric_like(self, tok: str) -> bool:
        return any(ch.isdigit() for ch in tok)

    def predict_sentence(self, tokens):
        n = len(tokens)
        labels = ["O"] * n
        lower = [t.lower() for t in tokens]

        # rules for leagal stuff
        i = 0
        while i < n:
            t = tokens[i]
            tl = lower[i]

            # Pattern could be § 123 
            if t == "§":
                labels[i] = "B-LEG"
                j = i + 1
                # then we take a short window of tokens after 
                while j < n and tokens[j] not in {".", ";"} and j < i + 8:
                    labels[j] = "I-LEG"
                    j += 1
                i = j
                continue

            # Patrern is Artikel ...
            if t in {"Artikel", "Artikel.", "Art.", "Art"}:
                labels[i] = "B-LEG"
                j = i + 1
                #continues with numbers or legal words
                while j < n:
                    tj = tokens[j]
                    tlj = lower[j]
                    if self._is_numeric_like(tj) or t in self.leg_follow or tlj in {lf.lower() for lf in self.leg_follow}:
                        labels[j] = "I-LEG"
                        j += 1
                    else:
                        break
                i = j
                continue

            i += 1

        # rulkes for monetary stuff
        for i, t in enumerate(tokens):
            if labels[i] != "O":
                #don't overwrite LEG
                continue
            tl = lower[i]
            prev_tok = lower[i-1] if i > 0 else ""
            next_tok = lower[i+1] if i + 1 < n else ""

            #Pattern could b numeric with thousand/decimal separators or "%"
            if self.mon_number_re.match(t) or t.endswith("%"):
                labels[i] = "B-MON"
                # attach directly adjacent % or currency tokens
                j = i + 1
                while j < n:
                    tj = tokens[j]
                    tlj = lower[j]
                    if (
                        tj.endswith("%")
                        or tlj in self.currency_tokens
                        or self.mon_number_re.match(tj)
                    ):
                        if labels[j] == "O":
                            labels[j] = "I-MON"
                        j += 1
                    else:
                        break
                continue

            # Pattern could be currency word with numeric before it
            if tl in self.currency_tokens and i > 0 and self._is_numeric_like(tokens[i-1]):
                if labels[i-1] == "O":
                    labels[i-1] = "B-MON"
                labels[i] = "I-MON"

        # rules for organizational stuff
        for i, t in enumerate(tokens):
            if labels[i] != "O":
                continue
            tl = lower[i]

            # Pattern could be lexicon hits like BANK
            if tl in self.org_keywords:
                labels[i] = "B-ORG"
                j = i + 1
                while j < n:
                    tj = tokens[j]
                    tlj = lower[j]
                    #extend through typical name pieces
                    if (
                        tj[0].isupper()
                        or tlj in self.org_keywords
                        or tlj in self.org_suffixes
                        or tj in {"-", "–", "&", "’s", "'s", ",", "."}
                    ):
                        if labels[j] == "O":
                            labels[j] = "I-ORG"
                        j += 1
                    else:
                        break
                continue

            # Pattern could be legal form suffix like "AG", "GmbH" etc.
            if tl in self.org_suffixes:
                #go backwards to find start of name chunk
                start = i
                while start - 1 >= 0 and tokens[start - 1][0].isupper() and labels[start - 1] == "O":
                    start -= 1
                labels[start] = "B-ORG"
                for j in range(start + 1, i + 1):
                    if labels[j] == "O":
                        labels[j] = "I-ORG"

        return labels


In [7]:
rule_model = SimpleRuleNER()

# Token-Level Naive Bayes

here we will build a Naive-Bayes but on the token-level - this version accounts for class imbalance right now
here we should probably do one version where we do not account for class imbalance -> almost always predict O and have good F1 -> error analysis -> account for class imbalance -> better predictions, but lower F1 score! 

In [None]:
class TokenNB:
    def __init__(self):
        self.labels = set()
        self.word_count = Counter()              # global word freq
        self.count_by_label = defaultdict(Counter)  # label -> word -> count
        self.label_count = Counter()             # label -> token count
        self.trained = False
        self.weights = {}
        self.label_priors = {}
        self.majority_label = "O"

    def count_tokens(self, token_label_pairs):
        """
        token_label_pairs: iterable of (word, label) pairs
        """
        for word, label in token_label_pairs:
            self.labels.add(label)
            self.word_count[word] += 1
            self.count_by_label[label][word] += 1
            self.label_count[label] += 1

    def calculate_weights(self):
        """
        Compute log P(word | label) and (optionally) log priors.
        """
        V = len(self.word_count)

        # log P(word | label) with Laplace smoothing
        self.weights = {}
        for word in self.word_count:
            self.weights[word] = {}
            for label in self.labels:
                num = self.count_by_label[label][word] + 1
                denom = self.label_count[label] + V
                self.weights[word][label] = log(num / denom)

        # ---- PRIOR HANDLING ----
        # Option 1: uniform priors  -> neutral w.r.t. imbalance
        self.label_priors = {label: 0.0 for label in self.labels}

        if self.label_count:
            self.majority_label = self.label_count.most_common(1)[0][0]
        else:
            self.majority_label = "O"

        self.trained = True

    def predict_token(self, word):
        """
        Predict a label for a single token.
        """
        if not self.trained:
            raise RuntimeError("Call calculate_weights() first")

        # unseen word -> fall back to majority label (probably 'O')
        if word not in self.weights:
            return self.majority_label

        best_label = None
        best_score = float("-inf")
        for label in self.labels:
            score = self.label_priors[label] + self.weights[word][label]
            if score > best_score:
                best_score = score
                best_label = label
        return best_label

    def predict_sentence(self, tokens):
        """
        tokens: list of word strings
        returns: list of predicted labels
        """
        return [self.predict_token(w) for w in tokens]

In [9]:
def evaluate_model(model, sentences, name=""):
    """
    sentences: list of dicts with keys "tokens" and "labels"
               - "tokens": list[str]
               - "labels": list[str] (gold BIO tags)
    """
    y_true, y_pred = [], []

    for sent in sentences:
        gold = sent["labels"]
        pred = model.predict_sentence(sent["tokens"])
        if len(pred) != len(gold):
            raise ValueError("Length mismatch between gold and prediction")
        y_true.extend(gold)
        y_pred.extend(pred)

    print(f"=== {name} ===")
    print(classification_report(y_true, y_pred, digits=3))


In [10]:
rain_tokens = [
    (w, l)
    for sent in train_set
    for w, l in zip(sent["tokens"], sent["labels"])
]

nb_model = TokenNB()
nb_model.count_tokens(train_tokens)
nb_model.calculate_weights()

evaluate_model(rule_model, dev_set, name="SimpleRuleNER")
evaluate_model(nb_model, dev_set, name="TokenNB (uniform prior)")

=== SimpleRuleNER ===
              precision    recall  f1-score   support

       B-LEG      1.000     0.500     0.667        12
       B-MON      0.000     0.000     0.000         2
       B-ORG      0.000     0.000     0.000         5
       I-LEG      0.000     0.000     0.000        23
       I-MON      0.000     0.000     0.000         1
       I-ORG      0.000     0.000     0.000        10
           O      0.931     1.000     0.964       638

    accuracy                          0.932       691
   macro avg      0.276     0.214     0.233       691
weighted avg      0.877     0.932     0.902       691

=== TokenNB (uniform prior) ===
              precision    recall  f1-score   support

       B-LEG      0.700     0.583     0.636        12
       B-MON      0.000     0.000     0.000         2
       B-ORG      1.000     0.400     0.571         5
       I-LEG      0.435     0.435     0.435        23
       I-MON      0.011     1.000     0.021         1
       I-ORG      0.545 

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
