# Always-On NER Baseline

This notebook implements the "always-on" baseline for near real-time Named Entity Recognition (NER). In this approach, we run NER on every token as it arrives, simulating a scenario where we have no selective inference strategy.

This is the most computationally expensive approach, but it gives us the earliest possible detection of entities.

## 1. Setup and Imports

First, let's import the necessary libraries and set up our environment.

In [1]:
%pip install datasets transformers

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
import sys
from tqdm import tqdm
import asyncio
import time
import random
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import os

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
sys.path.append("./src/")
from utils import convert_predictions
from streaming import process_ontonotes_example, stream_sentence
from metrics import Metrics

In [4]:
import metrics
from importlib import reload
reload(metrics)

from metrics import Metrics

## 2. Load OntoNotes Dataset

We'll use the OntoNotes 5.0 dataset, which contains texts from various genres with named entity annotations.

In [5]:
# Load the English portion of OntoNotes 5.0
ontonotes = load_dataset(
    "conll2012_ontonotesv5",
    "english_v12",
    cache_dir="./dataset/ontonotes",
)
print(f"Dataset loaded with splits: {ontonotes.keys()}")

# Get basic statistics for each split
for split_name in ontonotes.keys():
    print(f"{split_name.capitalize()} set: {len(ontonotes[split_name])} examples")

Dataset loaded with splits: dict_keys(['train', 'validation', 'test'])
Train set: 10539 examples
Validation set: 1370 examples
Test set: 1200 examples


## 3. Set Up NER Model

We'll use a pre-trained transformer model for NER. For this baseline, we'll use a BERT-based model

In [6]:
# Load pre-trained NER model
model_name = "dslim/bert-base-NER"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name)

# Create NER pipeline
ner_pipeline = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple")

# Wrapper function for NER that takes tokens and returns predictions
def run_ner_on_tokens(tokens):
    """Run NER on a list of tokens."""
    text = " ".join(tokens)
    pipeline_output = ner_pipeline(text)
    return pipeline_output

# NEW: Create a worker function that can be pickled for multiprocessing
def create_ner_worker():
    """Create a NER pipeline in a worker process."""
    from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
    model_name = "dslim/bert-base-NER"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)
    return pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple")

# Global variable to store the pipeline in worker processes
_worker_pipeline = None

def init_worker():
    """Initialize the NER pipeline in each worker process."""
    global _worker_pipeline
    _worker_pipeline = create_ner_worker()

def worker_run_ner(tokens):
    """Worker function that runs NER on tokens."""
    global _worker_pipeline
    text = " ".join(tokens)
    return _worker_pipeline(text)

# Test NER model on a sample sentence
sample_text = "John Smith works at Microsoft in Seattle."
print(f"Sample text: {sample_text}")
test_output = ner_pipeline(sample_text)
print(f"NER pipeline output: {test_output}")

Some weights of the model checkpoint at dslim/bert-base-NER were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use mps:0
Device set to use mps:0


Sample text: John Smith works at Microsoft in Seattle.
NER pipeline output: [{'entity_group': 'PER', 'score': np.float32(0.9996886), 'word': 'John Smith', 'start': 0, 'end': 10}, {'entity_group': 'ORG', 'score': np.float32(0.9989378), 'word': 'Microsoft', 'start': 20, 'end': 29}, {'entity_group': 'LOC', 'score': np.float32(0.9988439), 'word': 'Seattle', 'start': 33, 'end': 40}]


In [7]:
import re
# Removing punctuation to prevent mismatches and splitting the sample text into tokens
tokens = re.sub(r"[.,?!]+", "", sample_text).split(" ")

bio_tags = convert_predictions(tokens, test_output)
print(f"Tokens: {tokens}")
print(f"Converted BIO tags: {bio_tags}")

Tokens: ['John', 'Smith', 'works', 'at', 'Microsoft', 'in', 'Seattle']
Converted BIO tags: ['B-PER', 'I-PER', 'O', 'O', 'B-ORG', 'O', 'B-LOC']


In [8]:
test_metrics = Metrics()
# Simulate a prediction for the sample text

test_metrics.evaluate_metrics((tokens, bio_tags), [bio_tags[0:2], bio_tags])
test_metrics.print_metrics()

Metrics:
Total NER invocations: 2
Avg TTFD: 0.67
Entity Type          TP         TN         FP (#B-/I-MISC)      FN        
----------------------------------------------------------------------
O                    N/A        3          0                    N/A       
B-PERSON             0          N/A        0                    0         
I-PERSON             0          N/A        0                    0         
B-NORP               0          N/A        0                    0         
I-NORP               0          N/A        0                    0         
B-FAC                0          N/A        0                    0         
I-FAC                0          N/A        0                    0         
B-ORG                1          N/A        0                    0         
I-ORG                0          N/A        0                    0         
B-GPE                0          N/A        0                    0         
I-GPE                0          N/A        0           

## 4. Implement Always-On Baseline with Async Streams

Now we'll implement the always-on baseline using our async stream generator. This approach runs NER on every token as it arrives.

In [9]:
async def run_always_on(sentences):
    """Run NER on a list of sentences."""
    results = []
    for sentence in sentences:
        sentence, true_labels = sentence
        buffer = []
        sentence_results = []
        async for token, label in stream_sentence((sentence, true_labels)):
            buffer.append(token)
            pipeline_output = run_ner_on_tokens(buffer)
            sentence_results.append((buffer.copy(), pipeline_output))
        results.append(((sentence, true_labels), sentence_results))
    return results

test_split = ontonotes["test"]
test_example = test_split[0]
test_sentences = process_ontonotes_example(test_example)
test_sentence = test_sentences[3]
print(f"Test sentence: {test_sentence}")
run_always_output = await run_always_on([test_sentence])
print(f"Run always output: {run_always_output}")
bio_always_output = [list(map(lambda curr: (curr[0], convert_predictions(curr[0], curr[1])), predictions)) for _, predictions in run_always_output]
# bio_always_output = list(map(lambda curr: (curr[0], convert_predictions(curr[0], curr[1])), run_always_output))
print(f"Run always output BIO: {bio_always_output}")

Test sentence: (['Dear', 'viewers', ',', 'the', 'China', 'News', 'program', 'will', 'end', 'here', '.'], ['O', 'O', 'O', 'O', 'B-ORG', 'I-ORG', 'O', 'O', 'O', 'O', 'O'])
Run always output: [((['Dear', 'viewers', ',', 'the', 'China', 'News', 'program', 'will', 'end', 'here', '.'], ['O', 'O', 'O', 'O', 'B-ORG', 'I-ORG', 'O', 'O', 'O', 'O', 'O']), [(['Dear'], []), (['Dear', 'viewers'], []), (['Dear', 'viewers', ','], []), (['Dear', 'viewers', ',', 'the'], []), (['Dear', 'viewers', ',', 'the', 'China'], [{'entity_group': 'LOC', 'score': np.float32(0.9609133), 'word': 'China', 'start': 19, 'end': 24}]), (['Dear', 'viewers', ',', 'the', 'China', 'News'], [{'entity_group': 'ORG', 'score': np.float32(0.9969154), 'word': 'China News', 'start': 19, 'end': 29}]), (['Dear', 'viewers', ',', 'the', 'China', 'News', 'program'], [{'entity_group': 'ORG', 'score': np.float32(0.95669866), 'word': 'China News', 'start': 19, 'end': 29}]), (['Dear', 'viewers', ',', 'the', 'China', 'News', 'program', 'will']

Previous example uses the truth values, therefore preprocessing the examples using `process_ontonotes_example`. Below we do the same thing without using the ground truth labels:

In [10]:
test_sentence = test_split[0]['sentences'][3]['words']
print(f"Test sentence: {test_sentence}")
run_always_output = await run_always_on([(test_sentence, None)])
print(f"Run always output: {run_always_output}")
bio_always_output = [list(map(lambda curr: (curr[0], convert_predictions(curr[0], curr[1])), predictions)) for _, predictions in run_always_output]
print(f"Run always output BIO: {bio_always_output}")

Test sentence: ['Dear', 'viewers', ',', 'the', 'China', 'News', 'program', 'will', 'end', 'here', '.']
Run always output: [((['Dear', 'viewers', ',', 'the', 'China', 'News', 'program', 'will', 'end', 'here', '.'], None), [(['Dear'], []), (['Dear', 'viewers'], []), (['Dear', 'viewers', ','], []), (['Dear', 'viewers', ',', 'the'], []), (['Dear', 'viewers', ',', 'the', 'China'], [{'entity_group': 'LOC', 'score': np.float32(0.9609133), 'word': 'China', 'start': 19, 'end': 24}]), (['Dear', 'viewers', ',', 'the', 'China', 'News'], [{'entity_group': 'ORG', 'score': np.float32(0.9969154), 'word': 'China News', 'start': 19, 'end': 29}]), (['Dear', 'viewers', ',', 'the', 'China', 'News', 'program'], [{'entity_group': 'ORG', 'score': np.float32(0.95669866), 'word': 'China News', 'start': 19, 'end': 29}]), (['Dear', 'viewers', ',', 'the', 'China', 'News', 'program', 'will'], [{'entity_group': 'ORG', 'score': np.float32(0.9803317), 'word': 'China News', 'start': 19, 'end': 29}]), (['Dear', 'viewers

## 5. Running always-on baseline over test dataset

In [11]:
always_on_metrics = Metrics()

examples = [ontonotes["test"][0:1]]
print(f"Processing {len(examples)} examples from the test set.")

for doc in tqdm(examples):
    # Fix: Sometimes doc['sentences'] is a list of lists, so we need to flatten it
    if isinstance(doc['sentences'], list) and isinstance(doc['sentences'][0], list):
        doc['sentences'] = [sentence for sublist in doc['sentences'] for sentence in sublist]
    sentences = process_ontonotes_example(doc)
    results = await run_always_on(sentences)
    results_bio = [(true_bio, list(map(lambda curr: convert_predictions(curr[0], curr[1]), predictions))) for true_bio, predictions in results]
    for true_bio, predictions in results_bio:
        always_on_metrics.evaluate_metrics(true_bio, predictions)

always_on_metrics.save_metrics("baselines/always_on_metrics.pkl")

Processing 1 examples from the test set.


100%|██████████| 1/1 [01:03<00:00, 63.49s/it]
100%|██████████| 1/1 [01:03<00:00, 63.49s/it]


In [12]:
# always_on_metrics = Metrics()
# always_on_metrics.load_metrics("baselines/always_on_metrics.pkl")
always_on_metrics.print_metrics()

Metrics:
Total NER invocations: 7474
Avg TTFD: 0.11
Entity Type          TP         TN         FP (#B-/I-MISC)      FN        
----------------------------------------------------------------------
O                    N/A        6887       15                   N/A       
B-PERSON             0          N/A        20                   4         
I-PERSON             0          N/A        12                   3         
B-NORP               0          N/A        1 (1/0)              0         
I-NORP               0          N/A        0                    0         
B-FAC                0          N/A        33                   8         
I-FAC                0          N/A        85                   0         
B-ORG                8          N/A        2                    7         
I-ORG                31         N/A        12                   2         
B-GPE                0          N/A        52                   5         
I-GPE                0          N/A        11       

## 6. Optimizations

In [19]:
# Configuration: adjust these for speed vs completeness trade-off
SAMPLE_SIZE = 100  # Set to None to process all documents, or a number for sampling
BATCH_SIZE = 10    # Process documents in batches to reduce memory usage
USE_SAMPLING = False  # Set to False to process all documents

# Initialize metrics
always_on_metrics = Metrics()

# Get test examples with optional sampling
test_examples = list(ontonotes["test"])
if USE_SAMPLING and SAMPLE_SIZE:
    # Use random seed for reproducible sampling
    random.seed(42)
    test_examples = random.sample(test_examples, min(SAMPLE_SIZE, len(test_examples)))
    print(f"Sampling {len(test_examples)} examples from {len(ontonotes['test'])} total documents")
else:
    print(f"Processing all {len(test_examples)} examples from the test set")

print(f"Batch size: {BATCH_SIZE}")
print(f"Processing mode: {'Sampled' if USE_SAMPLING and SAMPLE_SIZE else 'Full dataset'}")

async def process_single_document_optimized(doc):
    """Process a single document and return metrics data."""
    try:
        sentences = process_ontonotes_example(doc)
        results = await run_always_on(sentences)
        results_bio = [(true_bio, list(map(lambda curr: convert_predictions(curr[0], curr[1]), predictions))) 
                       for true_bio, predictions in results]
        return results_bio
    except Exception as e:
        print(f"Error processing document: {e}")
        return []

async def process_batch_optimized(batch):
    """Process a batch of documents concurrently."""
    tasks = [process_single_document_optimized(doc) for doc in batch]
    return await asyncio.gather(*tasks, return_exceptions=True)

# Run optimized batch processing
total_processed = 0
total_errors = 0

print(f"\n🚀 Starting processing...")

for i in tqdm(range(0, len(test_examples), BATCH_SIZE), desc="Processing batches"):
    batch = test_examples[i:i+BATCH_SIZE]
    batch_results = await process_batch_optimized(batch)
    
    # Update metrics with batch results
    for doc_results in batch_results:
        if isinstance(doc_results, Exception):
            total_errors += 1
            continue
        
        for true_bio, predictions in doc_results:
            always_on_metrics.evaluate_metrics(true_bio, predictions)
    
    total_processed += len(batch)
        

print(f"\n✅ Processing complete!")
print(f"Total documents processed: {total_processed}")
print(f"Total errors: {total_errors}")

# Save metrics
metrics_filename = f"baselines/always_on_metrics_{'sampled' if USE_SAMPLING and SAMPLE_SIZE else 'full'}_{len(test_examples)}docs.pkl"
always_on_metrics.save_metrics(metrics_filename)
print(f"Metrics saved to: {metrics_filename}")

Processing all 1200 examples from the test set
Batch size: 10
Processing mode: Full dataset

🚀 Starting processing...


Processing batches: 100%|██████████| 120/120 [1:07:11<00:00, 33.59s/it]  


✅ Processing complete!
Total documents processed: 1200
Total errors: 0
Metrics saved to: baselines/always_on_metrics_full_1200docs.pkl





In [20]:
# Print current metrics
print("ALWAYS-ON BASELINE RESULTS")
print("="*40)
# always_on_metrics = Metrics()
# always_on_metrics.load_metrics("baselines/always_on_metrics_full_1200.pkl")
always_on_metrics.print_metrics()


ALWAYS-ON BASELINE RESULTS
Metrics:
Total NER invocations: 230118
Avg TTFD: 0.44
Entity Type          TP         TN         FP (#B-/I-MISC)      FN        
----------------------------------------------------------------------
O                    N/A        203560     3233                 N/A       
B-PERSON             0          N/A        1559 (15/0)          575       
I-PERSON             0          N/A        945 (11/13)          567       
B-NORP               0          N/A        852 (832/4)          138       
I-NORP               0          N/A        149 (54/90)          13        
B-FAC                0          N/A        93 (2/0)             56        
I-FAC                0          N/A        209 (4/6)            34        
B-ORG                1177       N/A        111                  714       
I-ORG                1550       N/A        446                  707       
B-GPE                0          N/A        2162 (47/2)          384       
I-GPE                0 