# NER & Sentiment Classification

## pip

In [8]:
!pip install transformers torch pandas numpy scikit-learn seaborn matplotlib tqdm



## Mount G-Drive

In [9]:
from google.colab import drive
drive.mount('/content/drive')

# Set your dataset path here
DATASET_PATH = '/content/drive/MyDrive/cs491_DataAnalyzer/Dataset/synthetic_insider_threat.csv'
MODEL_OUTPUT_DIR = '/content/drive/MyDrive/cs491_DataAnalyzer/Dataset/models'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Model Implementation

*   Base model class with shared functionality.
*   NER model implementation
*   Sentiment analysis model implementation
*   All model-related utilities (dataset class, metrics)



In [10]:
"""
Models for Insider Threat Detection

This module implements a multi-task learning approach combining:
1. Named Entity Recognition (NER) for identifying sensitive entities
2. Logistic Regression for insider threat detection
"""

from transformers import (
    AutoModelForTokenClassification, #  For NER tasks
    AutoTokenizer, # For tokenization
    AutoModelForSequenceClassification, # For sentiment analysis tasks
    TrainingArguments,
    Trainer, # 
    DataCollatorForTokenClassification # for Batched NER data
)
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset
import numpy as np
from typing import Dict, List, Optional
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

# Entity labels for NER
NER_LABELS = [
    "O",  # Outside of named entity
    "B-ROLE", "I-ROLE",  # Role entities (B: Beginning, I: Inside)
    "B-FACILITY", "I-FACILITY",  # Facility entities
    "B-ACCESS_CODE", "I-ACCESS_CODE",  # Access code entities
    "B-SENSITIVE_DATA", "I-SENSITIVE_DATA",  # Sensitive data entities
]

class InsiderThreatDataset(Dataset):
    """
    Custom dataset for insider threat detection tasks.

    This dataset handles both NER and sentiment analysis data formats.

    Args:
        texts (List[str]): List of input text sequences
        labels (Optional[List[str]]): List of labels for each text sequence
        tokenizer: Tokenizer instance for text preprocessing

    Returns:
        Dataset instance that can be used with PyTorch DataLoader
    """
    def __init__(self, texts: List[str], labels: Optional[List] = None, tokenizer=None):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx] if self.labels is not None else None

def compute_metrics(pred):
    """
    Compute evaluation metrics for model predictions.

    For NER: Uses token-level metrics
    For Sentiment: Uses binary classification metrics

    Args:
        pred: Prediction object with label_ids and predictions

    Returns:
        Dict with accuracy, f1, precision, and recall
    """
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1) if len(pred.predictions.shape) > 1 else (pred.predictions > 0.5).astype(int)
    
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, preds, average='weighted'
    )
    acc = accuracy_score(labels, preds)
    
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

class BaseModel:
    """
    Base class for insider threat detection models.
    Provides common functionality for both NER and Sentiment models.

    Args:
        model_name (str): Pre-trained model name/path
        num_labels (int): Number of output labels
    """
    def __init__(self, model_name: str, num_labels: int):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = None

    def prepare_data(self, texts: List[str], labels: Optional[List] = None):
        """
        Prepare input data for model training or inference.

        Args:
            texts (List[str]): List of input text sequences
            labels (Optional[List]): List of labels (if for training)

        Returns:
            dict: Encoded inputs containing:
                - input_ids: Token IDs
                - attention_mask: Attention mask
                - labels: Encoded labels (if provided)
        """
        encodings = self.tokenizer(
            texts,
            truncation=True,
            padding=True,
            return_tensors="pt"
        )
        
        if labels is not None:
            encodings['labels'] = torch.tensor(labels)
        
        return encodings

    def train(self, train_texts, train_labels, val_texts, val_labels, output_dir, 
              num_epochs=3, batch_size=16, learning_rate=2e-5):
        """
        Train the model using the provided data.

        Args:
            train_texts (List[str]): Training text sequences
            train_labels (List): Training labels
            val_texts (List[str]): Validation text sequences
            val_labels (List): Validation labels
            output_dir (str): Directory to save model checkpoints
            num_epochs (int, optional): Number of training epochs. Defaults to 3
            batch_size (int, optional): Batch size for training. Defaults to 16
            learning_rate (float, optional): Learning rate. Defaults to 2e-5

        Returns:
            Trainer: Trained model trainer instance
        """
        train_encodings = self.prepare_data(train_texts, train_labels)
        val_encodings = self.prepare_data(val_texts, val_labels)
        
        train_dataset = InsiderThreatDataset(train_encodings, train_labels, self.tokenizer)
        val_dataset = InsiderThreatDataset(val_encodings, val_labels, self.tokenizer)
        
        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=num_epochs,
            per_device_train_batch_size=batch_size,
            per_device_eval_batch_size=batch_size,
            learning_rate=learning_rate,
            evaluation_strategy="epoch",
            save_strategy="epoch",
            load_best_model_at_end=True,
            metric_for_best_model="f1",
        )
        
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            compute_metrics=compute_metrics,
            data_collator=self.data_collator if hasattr(self, 'data_collator') else None
        )
        
        trainer.train()
        self.model.save_pretrained(f"{output_dir}/best_model")
        self.tokenizer.save_pretrained(f"{output_dir}/best_model")

class CorporateNERModel(BaseModel):
    """
    Named Entity Recognition model for identifying corporate entities.

    This model specializes in detecting entities such as:
    - Roles (e.g., "CEO", "System Admin")
    - Facilities (e.g., "Server Room", "R&D Lab")
    - Access Codes
    - Sensitive Data references

    Args:
        model_name (str, optional): Pre-trained model name. Defaults to "microsoft/deberta-v3-base"
        num_labels (int, optional): Number of NER labels. Defaults to len(NER_LABELS)
    """
    def __init__(
            self,
            model_name: str = "microsoft/deberta-v3-base",
            num_labels: int = len(NER_LABELS)
        ):
        super().__init__(model_name, num_labels)
        self.model = AutoModelForTokenClassification.from_pretrained(
            model_name,
            num_labels=num_labels
        ).to(self.device)
        
        self.data_collator = DataCollatorForTokenClassification(
            tokenizer=self.tokenizer,
            padding=True,
            return_tensors="pt"
        )

    def predict(self, texts: List[str]) -> List[List[str]]:
        """
        Predict NER tags for input texts.

        Args:
            texts: List of input texts

        Returns:
            List[List[str]]: Predicted NER tags for each token in each sequence
        """
        self.model.eval()
        encodings = self.prepare_data(texts)
        
        with torch.no_grad():
            outputs = self.model(**encodings)
            predictions = outputs.logits.argmax(dim=-1)
        
        return [[NER_LABELS[p.item()] for p in pred_seq] for pred_seq in predictions]

class InsiderThreatLogisticModel(BaseModel):
    """
    Logistic regression model for insider threat detection.
    Uses sigmoid activation to output threat probabilities.

    Args:
        model_name (str): Pre-trained model name
    """
    def __init__(
            self,
            model_name: str = "cardiffnlp/twitter-roberta-base-sentiment-latest",
        ):
        super().__init__(model_name, num_labels=1)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=1,
            problem_type="regression"
        ).to(self.device)
        
        # Replace classification head with binary logistic
        self.model.classifier = torch.nn.Sequential(
            torch.nn.Linear(self.model.config.hidden_size, 1),
            torch.nn.Sigmoid()
        ).to(self.device)

    def compute_loss(self, logits, labels):
        """Compute binary cross entropy loss"""
        return F.binary_cross_entropy(logits.squeeze(), labels.float())

    def predict(self, texts: List[str]) -> np.ndarray:
        """
        Predict threat probabilities for input texts.

        Args:
            texts: List of input texts

        Returns:
            Array of threat probabilities (0 to 1)
        """
        self.model.eval()
        encodings = self.prepare_data(texts)
        
        with torch.no_grad():
            outputs = self.model(**encodings)
            probabilities = outputs.logits.squeeze().cpu().numpy()
        
        return probabilities

    def predict_with_threshold(self, texts: List[str], threshold: float = 0.5) -> tuple[np.ndarray, np.ndarray]:
        """
        Predict binary threats with a custom threshold.

        Args:
            texts: List of input texts
            threshold: Classification threshold (default: 0.5)

        Returns:
            Tuple of (probabilities, binary_predictions)
        """
        probabilities = self.predict(texts)
        return probabilities, (probabilities >= threshold).astype(int)


##

## Data Preparation

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import os
import seaborn as sns
import matplotlib.pyplot as plt
from typing import Dict

def prepare_dataset(data_path, random_state=42):
    """
    Prepare the dataset by splitting it into train, validation, and test sets.
    Maintains stratification across the insider threat label.
    
    Args:
        data_path (str): Path to the CSV file
        random_state (int): Random seed for reproducibility
    
    Returns:
        dict: Dictionary containing train, validation, and test DataFrames
    """
    df = pd.read_csv(data_path)
    
    # Convert boolean strings to actual boolean values
    df['Is Insider Threat'] = df['Is Insider Threat'].map({'Yes': True, 'No': False})
    
    # First split: separate test set (15%)
    train_val, test = train_test_split(
        df,
        test_size=0.15,
        stratify=df['Is Insider Threat'],
        random_state=random_state
    )
    
    # Second split: separate validation set from training set (15% of original = 17.6% of remaining)
    train, val = train_test_split(
        train_val,
        test_size=0.176,  # 0.176 of 85% ≈ 15% of total
        stratify=train_val['Is Insider Threat'],
        random_state=random_state
    )

    # Save splits to CSV files
    output_dir = os.path.dirname(data_path)    
    train.to_csv(os.path.join(output_dir, 'train.csv'), index=False)
    val.to_csv(os.path.join(output_dir, 'val.csv'), index=False)
    test.to_csv(os.path.join(output_dir, 'test.csv'), index=False)
    
    print("\nDataset Split Stats:")
    print(f"Total samples: {len(df)}")
    print(f"Training samples: {len(train)} ({len(train)/len(df)*100:.1f}%)")
    print(f"Validation samples: {len(val)} ({len(val)/len(df)*100:.1f}%)")
    print(f"Test samples: {len(test)} ({len(test)/len(df)*100:.1f}%)")
    
    print("\nInsider Threat Distribution:")
    for name, dataset in [('Training', train), ('Validation', val), ('Test', test)]:
        threat_dist = dataset['Is Insider Threat'].value_counts(normalize=True)
        print(f"\n{name} set:")
        print(f"Malicious: {threat_dist[True]*100:.1f}%")
        print(f"Non-malicious: {threat_dist[False]*100:.1f}%")
    
    return {
        'train': train,
        'val': val,
        'test': test
    }

def visualize_distribution(df: pd.DataFrame, before_col: str, after_col: str, title: str = "Label Distribution"):
    """
    Visualize the distribution of labels before and after conversion.
    
    Args:
        df (pd.DataFrame): DataFrame containing the data
        before_col (str): Column name for original labels
        after_col (str): Column name for converted labels
        title (str): Title for the plot
    """
    # Create figure with two subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
    
    # Plot original distribution
    sns.countplot(data=df, x=before_col, ax=ax1)
    ax1.set_title("Original Distribution")
    ax1.set_ylabel("Count")
    
    # Plot converted distribution
    sns.countplot(data=df, x=after_col, ax=ax2)
    ax2.set_title("Converted Distribution")
    ax2.set_ylabel("Count")
    
    plt.suptitle(title)
    plt.tight_layout()
    return fig

def display_sample_conversions(df: pd.DataFrame, n_samples: int = 5):
    """
    Display sample rows showing the conversion from insider threat to sentiment.
    
    Args:
        df (pd.DataFrame): DataFrame with both original and converted labels
        n_samples (int): Number of samples to display
    """
    samples = df[['Tweet', 'Is Insider Threat', 'sentiment']].sample(n=n_samples)
    print("\nSample Conversions:")
    for _, row in samples.iterrows():
        print(f"\nTweet: {row['Tweet']}")
        print(f"Original: {row['Is Insider Threat']}")
        print(f"Converted: {row['sentiment']} ({'Insider Threat' if row['sentiment'] == 1 else 'Not a Threat'})")

def prepare_sentiment_labels(df: pd.DataFrame, visualize: bool = True) -> pd.DataFrame:
    """
    Convert insider threat labels to binary sentiment scores.
    
    Converts 'Is Insider Threat' column to sentiment scores where:
    1 = Insider Threat (Yes)
    0 = Not a Threat (No)
    
    Args:
        df (pd.DataFrame): Input DataFrame with 'Is Insider Threat' column
        visualize (bool): Whether to show distribution visualizations
        
    Returns:
        pd.DataFrame: DataFrame with new 'sentiment' column
    """
    df = df.copy()
    
    # Store original values for visualization
    df['original_label'] = df['Is Insider Threat']
    
    # Convert to binary sentiment where 1 is positive (insider threat)
    df['sentiment'] = (df['Is Insider Threat'] == True).astype(int)
    
    # Print distribution of sentiment scores
    sentiment_dist = df['sentiment'].value_counts(normalize=True)
    print("\nSentiment Distribution:")
    print(f"Positive (Insider Threat): {sentiment_dist[1]*100:.1f}%")
    print(f"Negative (Non-Threat): {sentiment_dist[0]*100:.1f}%")
    
    if visualize:
        # Visualize the distribution
        fig = visualize_distribution(df, 'original_label', 'sentiment', 
                                   "Insider Threat to Sentiment Conversion")
        plt.show()
        
        # Display sample conversions
        display_sample_conversions(df)
    
    # Drop the temporary column
    df = df.drop('original_label', axis=1)
    
    return df

def load_and_prepare_data(data_dir: str, visualize: bool = True) -> Dict[str, pd.DataFrame]:
    """
    Load and prepare data splits with sentiment scores.
    
    Args:
        data_dir (str): Directory containing train.csv, val.csv, and test.csv
        visualize (bool): Whether to show distribution visualizations
        
    Returns:
        dict: Dictionary containing prepared DataFrames for train, val, and test sets
    """
    # Load split datasets
    datasets = {
        'train': pd.read_csv(os.path.join(data_dir, 'train.csv')),
        'val': pd.read_csv(os.path.join(data_dir, 'val.csv')),
        'test': pd.read_csv(os.path.join(data_dir, 'test.csv'))
    }
    
    # Convert insider threat labels to sentiment scores for each split
    for split in datasets:
        print(f"\nPreparing {split} set:")
        datasets[split] = prepare_sentiment_labels(datasets[split], visualize=visualize)
    
    return datasets

if __name__ == "__main__":
    # Path to your dataset
    data_path = r"c:\Users\Triet\OneDrive\GMU\Spring 25\cs491\data\synthetic_insider_threat.csv"
    
    # Prepare the dataset
    splits = prepare_dataset(data_path)
    
    # Convert to sentiment scores (optional)
    splits_with_sentiment = {
        split: prepare_sentiment_labels(df) 
        for split, df in splits.items()
    }
