In [None]:
!pip install sklearn_crfsuite
!pip install datasets
!pip install evaluate
!pip install seqeval
!pip install tabulate



In [None]:
# CRF-based NER model for Telugu WikiAnn data with comprehensive evaluation
# --------------------------------------------------------------------
import numpy as np
import sklearn_crfsuite
from sklearn_crfsuite import metrics
from collections import Counter
from datasets import load_dataset
from evaluate import load
from tabulate import tabulate

In [None]:
# Feature extraction functions for CRF
# ----------------------------------
def word2features(sent, i):
    """Extract features from word at position i."""
    word = sent[i]

    features = {
        'bias': 1.0,
        'word.lower()': word.lower(),
        'word[-3:]': word[-3:] if len(word) >= 3 else word,
        'word[-2:]': word[-2:] if len(word) >= 2 else word,
        'word.isupper()': word.isupper(),
        'word.istitle()': word.istitle(),
        'word.isdigit()': word.isdigit(),
        'word.contains_hyphen': '-' in word,
        'word.contains_dot': '.' in word,
        'word.length': len(word),
        'word.prefix-2': word[:2] if len(word) >= 2 else word,
        'word.prefix-3': word[:3] if len(word) >= 3 else word,
        'word.suffix-2': word[-2:] if len(word) >= 2 else word,
        'word.suffix-3': word[-3:] if len(word) >= 3 else word,
        'word.contains_digit': any(char.isdigit() for char in word),
        'word.contains_uppercase': any(char.isupper() for char in word),
        'word.is_alphanumeric': word.isalnum(),
        'word.is_alphabetic': word.isalpha(),
    }

    # Context features - previous word
    if i > 0:
        word1 = sent[i-1]
        features.update({
            '-1:word.lower()': word1.lower(),
            '-1:word.istitle()': word1.istitle(),
            '-1:word.isupper()': word1.isupper(),
            '-1:word.isdigit()': word1.isdigit(),
            '-1:word.prefix-2': word1[:2] if len(word1) >= 2 else word1,
            '-1:word.suffix-2': word1[-2:] if len(word1) >= 2 else word1,
        })
    else:
        features['BOS'] = True

    # Context features - next word
    if i < len(sent)-1:
        word1 = sent[i+1]
        features.update({
            '+1:word.lower()': word1.lower(),
            '+1:word.istitle()': word1.istitle(),
            '+1:word.isupper()': word1.isupper(),
            '+1:word.isdigit()': word1.isdigit(),
            '+1:word.prefix-2': word1[:2] if len(word1) >= 2 else word1,
            '+1:word.suffix-2': word1[-2:] if len(word1) >= 2 else word1,
        })
    else:
        features['EOS'] = True

    # Additional context features
    if i > 1:
        word2 = sent[i-2]
        features.update({
            '-2:word.lower()': word2.lower(),
            '-2:word.istitle()': word2.istitle(),
        })

    if i < len(sent)-2:
        word2 = sent[i+2]
        features.update({
            '+2:word.lower()': word2.lower(),
            '+2:word.istitle()': word2.istitle(),
        })

    # Telugu-specific features
    features.update({
        'word.telugu_chars': sum(1 for c in word if '\u0C00' <= c <= '\u0C7F'),
        'word.is_telugu': all('\u0C00' <= c <= '\u0C7F' for c in word if c.isalpha()),
        'word.script_mixed': any('\u0C00' <= c <= '\u0C7F' for c in word) and
                            any(not('\u0C00' <= c <= '\u0C7F') for c in word if c.isalpha())
    })

    return features

def sent2features(sent):
    """Convert sentence to list of features."""
    return [word2features(sent, i) for i in range(len(sent))]

def sent2labels(tags):
    """Convert numeric tags to string labels for CRF."""
    return [str(tag) for tag in tags]  # Convert integers to strings


In [None]:
# Main CRF NER class
# -----------------
class CRFNER:
    def __init__(self, c1=0.1, c2=0.1, max_iterations=100):
        self.crf = sklearn_crfsuite.CRF(
            algorithm='lbfgs',
            c1=c1,
            c2=c2,
            max_iterations=max_iterations,
            all_possible_transitions=True
        )
        self.allowed_entities = {"PER", "LOC", "ORG"}  # WikiAnn has these three entity types
        self.tag_to_label = {
            '0': 'O',
            '1': 'B-PER', '2': 'I-PER',
            '3': 'B-ORG', '4': 'I-ORG',
            '5': 'B-LOC', '6': 'I-LOC'
        }

    def train(self, X_train, y_train):
        """Train CRF model."""
        self.crf.fit(X_train, y_train)

    def predict(self, X_test):
        """Predict using CRF model."""
        return self.crf.predict(X_test)

    def get_transition_features(self):
        """Get learned transition features."""
        if hasattr(self.crf, 'transition_features_'):
            return dict(self.crf.transition_features_)
        return {}

    def get_state_features(self):
        """Get learned state features."""
        if hasattr(self.crf, 'state_features_'):
            return dict(self.crf.state_features_)
        return {}

    def convert_to_iob_tags(self, predictions):
        """Convert string numeric tags to IOB format."""
        return [[self.tag_to_label[tag] for tag in seq] for seq in predictions]


In [None]:
# Prepare data for CRF
# -------------------
def prepare_crf_data(dataset):
    """Prepare WikiAnn data for CRF training and testing."""
    # Prepare training data
    X_train = []
    y_train = []
    for example in dataset["train"]:
        tokens = example["tokens"]
        tags = example["ner_tags"]
        X_train.append(sent2features(tokens))
        y_train.append(sent2labels(tags))

    # Prepare test data
    X_test = []
    y_test = []
    for example in dataset["test"]:
        tokens = example["tokens"]
        tags = example["ner_tags"]
        X_test.append(sent2features(tokens))
        y_test.append(sent2labels(tags))

    return X_train, y_train, X_test, y_test

# Convert numeric tags to IOB format tags
def convert_numeric_to_iob_tags(tag_ids):
    """Convert numeric tags to IOB format tags."""
    id_to_tag = {
        0: "O",
        1: "B-PER", 2: "I-PER",
        3: "B-ORG", 4: "I-ORG",
        5: "B-LOC", 6: "I-LOC"
    }

    return [id_to_tag[int(tag_id)] for tag_id in tag_ids]


In [None]:
def evaluate_crf_ner_system(crf_model, X_test, y_test, tag_scheme="IOB2"): # Changed 'iob2' to 'IOB2'
    """Evaluate CRF NER system using seqeval."""
    # Get predictions
    y_pred = crf_model.predict(X_test)

    # Convert string predictions and references to IOB format
    iob_predictions = [convert_numeric_to_iob_tags(pred_seq) for pred_seq in y_pred]
    iob_references = [convert_numeric_to_iob_tags(ref_seq) for ref_seq in y_test]

    # Evaluate using seqeval
    metric = load("seqeval")
    results = metric.compute(predictions=iob_predictions, references=iob_references, scheme=tag_scheme)

    # Display results
    print("\n\033[1;34mCRF NER Evaluation Results:\033[0m")

    # Create detailed table for per-entity results
    crf_table = []
    for entity, metrics in results.items():
        if isinstance(metrics, dict) and any(e in entity for e in crf_model.allowed_entities):
            crf_table.append([
                entity,
                f"{metrics['precision']:.4f}",
                f"{metrics['recall']:.4f}",
                f"{metrics['f1']:.4f}",
                metrics['number']
            ])

    print(tabulate(crf_table, headers=["Entity", "Precision", "Recall", "F1 Score", "Support"], tablefmt="pretty"))

    # Overall results
    print(f"\n\033[1;36mOverall Metrics:\033[0m")
    print(f"Overall Precision: {results['overall_precision']:.4f}")
    print(f"Overall Recall: {results['overall_recall']:.4f}")
    print(f"Overall F1 Score: {results['overall_f1']:.4f}")
    print(f"Overall Accuracy: {results['overall_accuracy']:.4f}")

    return results

In [None]:
# Generate detailed seqeval report
# ------------------------------
def generate_crf_seqeval_report(crf_model, X_test, y_test, tag_scheme="iob2"):
    """Generate detailed seqeval classification report for CRF."""
    # Get predictions
    y_pred = crf_model.predict(X_test)

    # Convert string predictions and references to IOB format
    iob_predictions = [convert_numeric_to_iob_tags(pred_seq) for pred_seq in y_pred]
    iob_references = [convert_numeric_to_iob_tags(ref_seq) for ref_seq in y_test]

    # Generate seqeval classification report
    metric = load("seqeval")
    results = metric.compute(predictions=iob_predictions, references=iob_references, mode='strict', scheme=tag_scheme) # Using the correct tag_scheme
    print("\n\033[1;34mCRF NER Classification Report (seqeval):\033[0m")

    # Format results into a readable report
    print(f"Overall precision: {results['overall_precision']:.4f}")
    print(f"Overall recall: {results['overall_recall']:.4f}")
    print(f"Overall F1: {results['overall_f1']:.4f}")
    print(f"Overall accuracy: {results['overall_accuracy']:.4f}")

    print("\nEntity-level metrics:")
    for entity in ["PER", "ORG", "LOC"]:
        entity_key = entity  # The key in results dict
        if entity_key in results:
            ent_results = results[entity_key]
            print(f"  {entity}:")
            print(f"    Precision: {ent_results['precision']:.4f}")
            print(f"    Recall: {ent_results['recall']:.4f}")
            print(f"    F1: {ent_results['f1']:.4f}")
            print(f"    Support: {ent_results['number']}")

    return y_pred, y_test

In [None]:
# Analyze CRF model features
# ------------------------
def analyze_crf_features(crf_model):
    """Analyze important features learned by CRF."""
    # Get state and transition features
    state_features = crf_model.get_state_features()
    transition_features = crf_model.get_transition_features()

    # Mapping from numeric labels to IOB tags for better readability
    tag_to_label = {
        '0': 'O',
        '1': 'B-PER', '2': 'I-PER',
        '3': 'B-ORG', '4': 'I-ORG',
        '5': 'B-LOC', '6': 'I-LOC'
    }

    # Top state features for each label
    print("\n\033[1;34mTop State Features by Label:\033[0m")
    label_features = {}

    for (label, feature), weight in state_features.items():
        if label not in label_features:
            label_features[label] = []
        label_features[label].append((feature, weight))

    # Sort and display top features for each label
    for label, features in label_features.items():
        readable_label = tag_to_label.get(label, label)
        print(f"\n\033[1;36m{readable_label} (internal: {label}):\033[0m")
        top_features = sorted(features, key=lambda x: abs(x[1]), reverse=True)[:10]
        for feature, weight in top_features:
            print(f"  {feature}: {weight:.4f}")

    # Top transition features
    print("\n\033[1;34mTop Transition Features:\033[0m")
    top_transitions = sorted(transition_features.items(), key=lambda x: abs(x[1]), reverse=True)[:20]
    for (from_label, to_label), weight in top_transitions:
        readable_from = tag_to_label.get(from_label, from_label)
        readable_to = tag_to_label.get(to_label, to_label)
        print(f"  {readable_from} ({from_label}) -> {readable_to} ({to_label}): {weight:.4f}")


In [None]:
# Add Telugu sample data exploration function
# ----------------------------------------
def explore_telugu_dataset(dataset):
    """Explore the Telugu dataset to better understand its structure."""
    # Print some statistics
    print("\n\033[1;34mTelugu WikiAnn Dataset Exploration:\033[0m")
    print(f"Training set: {len(dataset['train'])} sentences")
    print(f"Test set: {len(dataset['test'])} sentences")
    print(f"Validation set: {len(dataset['validation'])} sentences")

    # Count tokens
    train_tokens = sum(len(ex['tokens']) for ex in dataset['train'])
    test_tokens = sum(len(ex['tokens']) for ex in dataset['test'])
    val_tokens = sum(len(ex['tokens']) for ex in dataset['validation'])

    print(f"\nTotal tokens: {train_tokens + test_tokens + val_tokens}")
    print(f"Training tokens: {train_tokens}")
    print(f"Test tokens: {test_tokens}")
    print(f"Validation tokens: {val_tokens}")

    # Count named entities
    tag_mapping = {
        0: "O",
        1: "B-PER", 2: "I-PER",
        3: "B-ORG", 4: "I-ORG",
        5: "B-LOC", 6: "I-LOC"
    }

    entity_counts = {'PER': 0, 'ORG': 0, 'LOC': 0}

    # Count entity occurrences (counting B- tags as entity starts)
    for example in dataset['train']:
        for tag in example['ner_tags']:
            if tag == 1:  # B-PER
                entity_counts['PER'] += 1
            elif tag == 3:  # B-ORG
                entity_counts['ORG'] += 1
            elif tag == 5:  # B-LOC
                entity_counts['LOC'] += 1

    print("\nNamed entity counts in training set:")
    for entity, count in entity_counts.items():
        print(f"  {entity}: {count}")

    # Distribution of tag types
    tag_counts = Counter()
    for example in dataset['train']:
        tag_counts.update(example['ner_tags'])

    print("\nTag distribution in training set:")
    for tag_id, count in sorted(tag_counts.items()):
        print(f"  {tag_mapping.get(tag_id, tag_id)}: {count}")

    # Print a few examples
    print("\nSample examples:")
    for i in range(min(3, len(dataset['train']))):
        print(f"\nExample {i+1}:")
        tokens = dataset['train'][i]['tokens']
        tags = dataset['train'][i]['ner_tags']
        print("Tokens:", tokens)
        print("Tags:", tags)
        print("IOB Tags:", [tag_mapping.get(tag, tag) for tag in tags])


In [None]:
# Main execution function for CRF evaluation
# ----------------------------------------
def run_crf_evaluation():
    """Run CRF model training and evaluation."""
    # Load dataset
    print("Loading Telugu WikiAnn NER dataset...")
    dataset = load_dataset("wikiann", "te")  # Using the lang code 'te' for Telugu

    # Explore the dataset
    explore_telugu_dataset(dataset)

    # Prepare data
    print("\nPreparing data for CRF...")
    X_train, y_train, X_test, y_test = prepare_crf_data(dataset)

    print(f"Training examples: {len(X_train)}")
    print(f"Test examples: {len(X_test)}")

    # Initialize and train CRF model
    print("Training CRF model...")
    crf_model = CRFNER(c1=0.1, c2=0.1, max_iterations=100)
    crf_model.train(X_train, y_train)

    # Evaluate model
    print("\nEvaluating CRF NER system...")
    results = evaluate_crf_ner_system(crf_model, X_test, y_test, tag_scheme="IOB2")

    # Generate detailed classification report
    print("\nGenerating detailed classification report...")
    generate_crf_seqeval_report(crf_model, X_test, y_test, tag_scheme="IOB2")



    return results

# Run the CRF evaluation
if __name__ == "__main__":
    run_crf_evaluation()

Loading Telugu WikiAnn NER dataset...

[1;34mTelugu WikiAnn Dataset Exploration:[0m
Training set: 1000 sentences
Test set: 1000 sentences
Validation set: 1000 sentences

Total tokens: 24322
Training tokens: 8087
Test tokens: 8155
Validation tokens: 8080

Named entity counts in training set:
  PER: 364
  ORG: 347
  LOC: 493

Tag distribution in training set:
  O: 5834
  B-PER: 364
  I-PER: 302
  B-ORG: 347
  I-ORG: 568
  B-LOC: 493
  I-LOC: 179

Sample examples:

Example 1:
Tokens: ['ఆంధ్ర', 'ప్రదేశ్లో', ',', 'వారు', 'జనాభాలో', '2', '%', 'కంటే', 'తక్కువగాను', ',', 'తమిళనాడులో', 'వారు', '3', '%', 'కన్నా', 'తక్కువ', 'మంది', 'ఉన్నారు', '.']
Tags: [5, 6, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0]
IOB Tags: ['B-LOC', 'I-LOC', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-LOC', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']

Example 2:
Tokens: ['దక్షిణాఫ్రికాతో', 'కలిసి', 'ఒక', 'పెట్టుబడి', 'కార్యక్రమాన్ని', 'గార్డనర్', 'రూపొందిస్తున్నట్లు', 'చెప్పబడుతుంది', '.']
Tags: [5, 0, 0, 0, 0, 0, 0, 0