In [None]:
import numpy as np
import pandas as pd
from gensim.models import Word2Vec
from sklearn.metrics import classification_report, confusion_matrix
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from typing import List, Tuple, Dict
import logging
from tqdm.auto import tqdm
import time
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

class NERDataset:
    def __init__(self, file_path: str):
        """Initialize NER dataset from TSV file"""
        self.sentences: List[List[str]] = []
        self.labels: List[List[str]] = []
        self._load_data(file_path)
        
    def _load_data(self, file_path: str) -> None:
        """Load and parse TSV data with progress bar"""
        current_sentence = []
        current_labels = []
        
        # First pass to count lines
        with open(file_path, 'r', encoding='utf-8') as f:
            total_lines = sum(1 for _ in f)
        
        # Second pass to load data
        with open(file_path, 'r', encoding='utf-8') as f:
            pbar = tqdm(f, total=total_lines, desc=f"Loading {file_path}")
            for line in pbar:
                line = line.strip()
                if line == '':
                    if current_sentence:
                        self.sentences.append(current_sentence)
                        self.labels.append(current_labels)
                        current_sentence = []
                        current_labels = []
                else:
                    parts = line.split('\t')
                    if len(parts) >= 2:
                        word = parts[0].lower()
                        label = parts[1]
                        current_sentence.append(word)
                        current_labels.append(label)
            
            if current_sentence:
                self.sentences.append(current_sentence)
                self.labels.append(current_labels)
        
        logger.info(f"Loaded {len(self.sentences)} sentences from {file_path}")

class Word2VecNERModel(nn.Module):
    def __init__(self, embedding_dim: int, hidden_dim: int, vocab_size: int, num_classes: int, dropout: float = 0.5):
        """Initialize the NER model architecture"""
        super(Word2VecNERModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, 
                           bidirectional=True, num_layers=2, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        lstm_out = self.dropout(lstm_out)
        output = self.fc(lstm_out)
        return output

class NERProcessor:
    def __init__(self, embedding_dim: int = 100, hidden_dim: int = 128, batch_size: int = 32):
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size
        self.word2idx: Dict[str, int] = {}
        self.label2idx: Dict[str, int] = {}
        self.idx2word: Dict[int, str] = {}
        self.idx2label: Dict[int, str] = {}
        
    def prepare_data(self, train_dataset: NERDataset, test_dataset: NERDataset) -> Tuple[int, int]:
        """Prepare vocabularies and Word2Vec model with progress tracking"""
        logger.info("Starting data preparation...")
        
        # Build vocabularies
        words = set()
        labels = set()
        
        # Process both datasets with progress tracking
        for dataset_name, dataset in [("training", train_dataset), ("test", test_dataset)]:
            pbar = tqdm(zip(dataset.sentences, dataset.labels), 
                       total=len(dataset.sentences),
                       desc=f"Processing {dataset_name} dataset")
            for sentence, sentence_labels in pbar:
                words.update(sentence)
                labels.update(sentence_labels)
        
        # Create mappings
        logger.info("Creating vocabulary mappings...")
        self.word2idx = {word: idx for idx, word in enumerate(words)}
        self.label2idx = {label: idx for idx, label in enumerate(labels)}
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        self.idx2label = {idx: label for label, idx in self.label2idx.items()}
        
        # Train Word2Vec model
        logger.info("Training Word2Vec model...")
        combined_sentences = train_dataset.sentences + test_dataset.sentences
        self.word2vec = Word2Vec(sentences=tqdm(combined_sentences, desc="Training Word2Vec"),
                                vector_size=self.embedding_dim,
                                window=5,
                                min_count=1,
                                workers=4)
        
        logger.info(f"Completed data preparation:")
        logger.info(f"- Vocabulary size: {len(words)}")
        logger.info(f"- Number of labels: {len(labels)}")
        return len(words), len(labels)

    def create_model(self, vocab_size: int, num_classes: int) -> Word2VecNERModel:
        """Create and initialize the NER model with Word2Vec embeddings"""
        logger.info("Initializing model...")
        model = Word2VecNERModel(self.embedding_dim, self.hidden_dim, vocab_size, num_classes)
        
        # Initialize embedding layer with progress tracking
        embedding_weights = np.zeros((vocab_size, self.embedding_dim))
        pbar = tqdm(self.word2idx.items(), total=len(self.word2idx), desc="Initializing embeddings")
        for word, idx in pbar:
            if word in self.word2vec.wv:
                embedding_weights[idx] = self.word2vec.wv[word]
        
        model.embedding.weight.data.copy_(torch.from_numpy(embedding_weights))
        logger.info("Model initialization completed")
        return model

    def prepare_sequence(self, seq: List[str]) -> torch.Tensor:
        """Convert word sequence to tensor of indices"""
        idxs = [self.word2idx[w] for w in seq]
        return torch.tensor(idxs, dtype=torch.long)

    def prepare_labels(self, labels: List[str]) -> torch.Tensor:
        """Convert label sequence to tensor of indices"""
        idxs = [self.label2idx[l] for l in labels]
        return torch.tensor(idxs, dtype=torch.long)

def train_model(model: Word2VecNERModel, 
                train_dataset: NERDataset, 
                processor: NERProcessor, 
                num_epochs: int = 10, 
                learning_rate: float = 0.001) -> List[float]:
    """Train the NER model with detailed progress tracking"""
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    training_losses = []
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info(f"Training on device: {device}")
    model = model.to(device)
    
    start_time = time.time()
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        epoch_start = time.time()
        
        pbar = tqdm(zip(train_dataset.sentences, train_dataset.labels),
                   total=len(train_dataset.sentences),
                   desc=f"Epoch {epoch+1}/{num_epochs}")
        
        for sentence, labels in pbar:
            model.zero_grad()
            
            sentence_in = processor.prepare_sequence(sentence).unsqueeze(0).to(device)
            targets = processor.prepare_labels(labels).to(device)
            
            outputs = model(sentence_in)
            loss = criterion(outputs.squeeze(0), targets)
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            pbar.set_postfix({'loss': f'{loss.item():.4f}'})
        
        avg_loss = total_loss / len(train_dataset.sentences)
        training_losses.append(avg_loss)
        
        epoch_time = time.time() - epoch_start
        logger.info(f"Epoch {epoch+1}/{num_epochs}:")
        logger.info(f"- Average loss: {avg_loss:.4f}")
        logger.info(f"- Time taken: {epoch_time:.2f}s")
    
    total_time = time.time() - start_time
    logger.info(f"Training completed in {total_time:.2f}s")
    return training_losses

def evaluate_model(model: Word2VecNERModel, 
                  test_dataset: NERDataset, 
                  processor: NERProcessor) -> Dict[str, float]:
    """Evaluate the model with comprehensive performance metrics"""
    logger.info("Starting comprehensive model evaluation...")
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    true_labels = []
    predicted_labels = []
    
    with torch.no_grad():
        pbar = tqdm(zip(test_dataset.sentences, test_dataset.labels),
                   total=len(test_dataset.sentences),
                   desc="Evaluating")
        
        for sentence, labels in pbar:
            sentence_in = processor.prepare_sequence(sentence).unsqueeze(0).to(device)
            outputs = model(sentence_in)
            _, predicted = torch.max(outputs.squeeze(0), 1)
            
            true_labels.extend([processor.label2idx[l] for l in labels])
            predicted_labels.extend(predicted.cpu().tolist())
    
    # Convert indices back to labels
    true_labels = [processor.idx2label[l] for l in true_labels]
    predicted_labels = [processor.idx2label[l] for l in predicted_labels]
    
    # Calculate comprehensive metrics
    report = classification_report(true_labels, predicted_labels, output_dict=True)
    
    # Print detailed performance analysis
    logger.info("\n" + "="*50)
    logger.info("DETAILED PERFORMANCE ANALYSIS")
    logger.info("="*50)
    
    # Overall Metrics
    logger.info("\nOVERALL METRICS:")
    logger.info(f"Macro Avg Precision: {report['macro avg']['precision']:.4f}")
    logger.info(f"Macro Avg Recall: {report['macro avg']['recall']:.4f}")
    logger.info(f"Macro Avg F1-Score: {report['macro avg']['f1-score']:.4f}")
    logger.info(f"Weighted Avg F1-Score: {report['weighted avg']['f1-score']:.4f}")
    
    # Per-Class Performance
    logger.info("\nPER-CLASS PERFORMANCE:")
    logger.info(f"{'Class':<15} {'Precision':<12} {'Recall':<12} {'F1-Score':<12} {'Support':<10}")
    logger.info("-" * 65)
    
    # Sort classes by F1-score for better analysis
    class_metrics = {
        label: metrics for label, metrics in report.items() 
        if label not in ['accuracy', 'macro avg', 'weighted avg']
    }
    sorted_classes = sorted(class_metrics.items(), 
                          key=lambda x: x[1]['f1-score'], 
                          reverse=True)
    
    for label, metrics in sorted_classes:
        logger.info(f"{label:<15} {metrics['precision']:<12.4f} "
                   f"{metrics['recall']:<12.4f} {metrics['f1-score']:<12.4f} "
                   f"{metrics['support']:<10}")
    
    # Performance Distribution Analysis
    f1_scores = [metrics['f1-score'] for metrics in class_metrics.values()]
    logger.info("\nF1-SCORE DISTRIBUTION:")
    logger.info(f"Maximum F1-Score: {max(f1_scores):.4f}")
    logger.info(f"Minimum F1-Score: {min(f1_scores):.4f}")
    logger.info(f"F1-Score Range: {max(f1_scores) - min(f1_scores):.4f}")
    logger.info(f"Standard Deviation: {np.std(f1_scores):.4f}")
    
    # Confusion Matrix Analysis
    logger.info("\nCONFUSION MATRIX STATISTICS:")
    conf_matrix = confusion_matrix(true_labels, predicted_labels)
    logger.info(f"Number of True Positives: {np.sum(np.diag(conf_matrix))}")
    logger.info(f"Total Predictions: {np.sum(conf_matrix)}")
    logger.info(f"Overall Accuracy: {report['accuracy']:.4f}")
    
    return report

def main():
    start_time = time.time()
    logger.info("Starting NER system training and evaluation")
    
    # Load datasets
    train_dataset = NERDataset('train.tsv')
    test_dataset = NERDataset('test_gold.tsv')
    
    # Initialize processor and prepare data
    processor = NERProcessor(embedding_dim=100, hidden_dim=128, batch_size=32)
    vocab_size, num_classes = processor.prepare_data(train_dataset, test_dataset)
    
    # Create and train model
    model = processor.create_model(vocab_size, num_classes)
    training_losses = train_model(model, train_dataset, processor, num_epochs=10)
    
    # Evaluate model
    metrics = evaluate_model(model, test_dataset, processor)
    
    total_time = time.time() - start_time
    logger.info(f"Total execution time: {total_time:.2f}s")
    
    # Get evaluation results
    metrics = evaluate_model(model, test_dataset, processor)

    # Get the actual class labels from the dataset
    class_labels = list(metrics.keys())  # Get all keys from the metrics dictionary
    class_labels = [label for label in class_labels if label not in ['accuracy', 'macro avg', 'weighted avg']] # Filter out non-class labels

    # Access specific metrics for the first class label (or any other label you choose)
    # You can modify this to loop through and access metrics for all class labels
    class_name = class_labels[0]  # Choose the first class label
    class_metrics = {
        'precision': metrics[class_name]['precision'],
        'recall': metrics[class_name]['recall'],
        'f1': metrics[class_name]['f1-score']
    }
    # Overall performance
    overall_metrics = {
        'macro_precision': metrics['macro avg']['precision'],
        'macro_recall': metrics['macro avg']['recall'],
        'macro_f1': metrics['macro avg']['f1-score']
    }
    return model, metrics, training_losses

if __name__ == "__main__":
    main()
    