<a href="https://colab.research.google.com/github/trilokgoel/Company_NER_S-P500_News/blob/main/MA_NER_spacy_base_optu_mltcls_workg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install optuna

Collecting optuna
  Downloading optuna-4.3.0-py3-none-any.whl.metadata (17 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.16.1-py3-none-any.whl.metadata (7.3 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Downloading optuna-4.3.0-py3-none-any.whl (386 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m386.6/386.6 kB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading alembic-1.16.1-py3-none-any.whl (242 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m242.5/242.5 kB[0m [31m18.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.9.0-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, alembic, optuna
Successfully installed alembic-1.16.1 colorlog-6.9.0 optuna-4.3.0


In [26]:
import spacy
from spacy.tokens import DocBin
from spacy.util import filter_spans
import pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, multilabel_confusion_matrix
import random
from datetime import datetime
import optuna
from optuna.samplers import TPESampler
import numpy as np
from typing import List, Dict, Tuple, Optional

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [45]:
class EnhancedSpacyNERModel:
    """
    Enhanced Spacy NER model for M&A entity recognition with 4 classes and hyperparameter tuning.
    """

    def __init__(self, model_name: Optional[str] = None):
        """
        Initialize the EnhancedSpacyNERModel.

        Args:
            model_name (str, optional): Name of existing Spacy model to load. Defaults to None (new model).
        """
        self.nlp = spacy.blank("en") if model_name is None else spacy.load(model_name)
        self.entity_types = ['ACQUIRER', 'TARGET', 'SELLER', 'NOT_M&A']  # Our custom entity labels
        self.best_params = None  # To store best hyperparameters from tuning

    def load_and_preprocess_data(self, file_path: str) -> List[Dict]:
        """
        Load and preprocess the labeled dataset with enhanced handling for NOT_M&A cases.

        Args:
            file_path (str): Path to the CSV file containing labeled data

        Returns:
            List[Dict]: Processed data in Spacy-compatible format
        """
        df = pd.read_csv(file_path, encoding='utf-8')
        print(f"Loaded {len(df)} rows from {file_path}")
        # Data validation
        required_cols = ['headline', 'entity_name', 'start', 'end', 'is_company', 'M&A_label']
        assert all(col in df.columns for col in required_cols), "CSV file must contain required columns"

        # Convert M&A_label to our entity types with NOT_M&A handling
        def map_label(label):
            if label == 'Acquirer':
                return 'ACQUIRER'
            elif label == 'Target':
                return 'TARGET'
            elif label == 'Seller':
                return 'SELLER'
            elif label == 'not_M&A':
                return 'NOT_M&A'
            else:
                return None

        df['entity_type'] = df['M&A_label'].apply(map_label)

        # Group by headline to create Spacy-compatible format
        grouped = df.groupby('headline').apply(
            lambda x: {
                'text': x.name,
                'entities': [(row.start, row.end, row.entity_type)
                           for row in x.itertuples() if pd.notna(row.entity_type)]
            }
        ).reset_index(name='spacy_format')

        return grouped['spacy_format'].tolist()

    def convert_to_spacy_format(self, data: List[Dict], output_file: str) -> None:
        """
        Convert data to Spacy's binary format with enhanced NOT_M&A handling.

        Args:
            data (List[Dict]): List of training examples
            output_file (str): Path to save the Spacy binary file
        """
        doc_bin = DocBin()

        for example in data:
            doc = self.nlp.make_doc(example['text'])
            entities = []

            # Handle cases where there are no entities (NOT_M&A)
            if not example['entities']:
                # For NOT_M&A, we don't add any entities
                pass
            else:
                for start, end, label in example['entities']:
                    span = doc.char_span(start, end, label=label)
                    if span is not None:
                        entities.append(span)

            filtered_entities = filter_spans(entities)
            doc.ents = filtered_entities
            doc_bin.add(doc)

        doc_bin.to_disk(output_file)
        print(f"Saved processed data to {output_file}")

    def objective(self, trial: optuna.Trial, train_data: str, validation: str) -> float:
        """
        Objective function for Optuna hyperparameter optimization.

        Args:
            trial (optuna.Trial): Optuna trial object
            train_data (str): Path to training data in Spacy binary format
            validation (str): Path to val data in Spacy binary format

        Returns:
            float: F1 score to maximize
        """
        # Suggest hyperparameters
        dropout = trial.suggest_float("dropout", 0.2, 0.5)
        batch_size = trial.suggest_categorical("batch_size", [8, 16, 24, 32])
        learn_rate = trial.suggest_float("learn_rate", 1e-4, 1e-3, log=True)
        n_iter = trial.suggest_int("n_iter", 20, 50)

        # Create a fresh model for this trial
        trial_nlp = spacy.blank("en")

        # Add NER pipeline
        if "ner" not in trial_nlp.pipe_names:
            ner = trial_nlp.add_pipe("ner")
        else:
            ner = trial_nlp.get_pipe("ner")

        # Add entity labels
        for label in self.entity_types:
            ner.add_label(label)

        # Load data
        train_docbin = DocBin().from_disk(train_data)
        val_docbin = DocBin().from_disk(validation)

        train_docs = list(train_docbin.get_docs(trial_nlp.vocab))
        val_docs = list(val_docbin.get_docs(trial_nlp.vocab))
        # Convert to Example objects
        train_examples = []
        for doc in train_docs:
            predoc = trial_nlp.make_doc(doc.text)
            example = spacy.training.Example(predoc, doc)
            train_examples.append(example)
        # Disable other pipes and train
        other_pipes = [pipe for pipe in trial_nlp.pipe_names if pipe != "ner"]
        with trial_nlp.disable_pipes(*other_pipes):
            optimizer = trial_nlp.initialize(lambda: train_examples)
            optimizer.learn_rate = learn_rate

            # Train with suggested parameters
            for itn in range(n_iter):
                random.shuffle(train_examples)
                losses = {}

                batches = spacy.util.minibatch(train_examples, size=batch_size)
                for batch in batches:
                    trial_nlp.update(
                        batch,
                        drop=dropout,
                        losses=losses,
                        sgd=optimizer
                    )

        # Evaluate on val set
        val_metrics = self._evaluate_with_confusion_matrix(trial_nlp, val_docs)
        f1_score = val_metrics['weighted avg']['f1-score']

        return f1_score

    def tune_hyperparameters(self, train_data: str, validation: str, n_trials: int = 25) -> Dict:
        """
        Perform hyperparameter tuning using Optuna.

        Args:
            train_data (str): Path to training data
            validation (str): Path to val data
            n_trials (int): Number of optimization trials

        Returns:
            Dict: Best hyperparameters found
        """
        sampler = TPESampler(seed=42)  # For reproducible results
        study = optuna.create_study(direction="maximize", sampler=sampler)
        study.optimize(lambda trial: self.objective(trial, train_data, validation), n_trials=n_trials)

        self.best_params = study.best_params
        print(f"Best hyperparameters: {self.best_params}")
        print(f"Best F1 score: {study.best_value:.4f}")

        return self.best_params

    def train_model(self, train_data: str, validation: str, output_dir: Path,
                   use_tuned_params: bool = True, n_iter: Optional[int] = None) -> None:
        """
        Train the Spacy NER model with optional hyperparameter tuning.

        Args:
            train_data (str): Path to training data
            validation (str): Path to val data
            output_dir (Path): Directory to save the trained model
            use_tuned_params (bool): Whether to use tuned hyperparameters
            n_iter (int, optional): Override for number of iterations
        """
        # Create the NER pipeline
        if "ner" not in self.nlp.pipe_names:
            ner = self.nlp.add_pipe("ner")
        else:
            ner = self.nlp.get_pipe("ner")

        # Add entity labels
        for label in self.entity_types:
            ner.add_label(label)

        # Load data
        train_docbin = DocBin().from_disk(train_data)
        val_docbin = DocBin().from_disk(validation)

        train_docs = list(train_docbin.get_docs(self.nlp.vocab))
        val_docs = list(val_docbin.get_docs(self.nlp.vocab))

        # Convert to Example objects
        train_examples = []
        for doc in train_docs:
            predoc = self.nlp.make_doc(doc.text)
            example = spacy.training.Example(predoc, doc)
            train_examples.append(example)

        # Set training parameters
        if use_tuned_params and self.best_params:
            dropout = self.best_params.get("dropout", 0.3)
            batch_size = self.best_params.get("batch_size", 8)
            learn_rate = self.best_params.get("learn_rate", 0.001)
            n_iter = self.best_params.get("n_iter", 30) if n_iter is None else n_iter
        else:
            dropout = 0.3
            batch_size = 8
            learn_rate = 0.001
            n_iter = 30 if n_iter is None else n_iter

        # Disable other pipes and train
        other_pipes = [pipe for pipe in self.nlp.pipe_names if pipe != "ner"]
        with self.nlp.disable_pipes(*other_pipes):
            optimizer = self.nlp.initialize(lambda: train_examples)
            optimizer.learn_rate = learn_rate

            best_f1 = 0
            no_improvement = 0
            patience = 5  # Early stopping patience

            print("Beginning training with parameters:")
            print(f"Dropout: {dropout}, Batch size: {batch_size}, Learn rate: {learn_rate}, Iterations: {n_iter}")

            patience = 3
            no_improvement = 0
            best_f1 = 0

            for itn in range(n_iter):
                random.shuffle(train_examples)
                losses = {}

                batches = spacy.util.minibatch(train_examples, size=batch_size)
                for batch in batches:
                    self.nlp.update(
                        batch,
                        drop=dropout,
                        losses=losses,
                        sgd=optimizer
                    )

                # Evaluate on val set every 5 iterations to save time
                if itn % 5 == 0:
                    val_metrics = self._evaluate_with_confusion_matrix(self.nlp, val_docs)
                    current_f1 = val_metrics['weighted avg']['f1-score']

                    if current_f1 > best_f1:
                        best_f1 = current_f1
                        no_improvement = 0
                    else:
                        no_improvement += 1

                    if no_improvement >= patience:
                        print(f"Early stopping triggered at iteration {itn}.")
                        break  # Early stop this trial

        # Load the best model
        self.nlp = spacy.load(output_dir / "best_model")
        print("Training complete. Best model loaded.")

    def _evaluate_with_confusion_matrix(self, nlp_model, docs: List) -> Dict:
        """
        Enhanced evaluation with multilabel confusion matrix.

        Args:
            nlp_model: Spacy model to evaluate
            docs (List): List of Spacy Doc objects to evaluate on

        Returns:
            Dict: Evaluation metrics including confusion matrices
        """
        true_ents = []
        pred_ents = []

        for doc in docs:
            true_ents.extend([(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents])

            # Predict entities
            pred_doc = nlp_model(doc.text)
            pred_ents.extend([(ent.start_char, ent.end_char, ent.label_) for ent in pred_doc.ents])

        # Convert to classification format
        y_true = [ent[2] for ent in true_ents]
        y_pred = []

        # Match predicted entities to true entities
        for true_ent in true_ents:
            matched = False
            for pred_ent in pred_ents:
                if pred_ent[0] == true_ent[0] and pred_ent[1] == true_ent[1]:
                    y_pred.append(pred_ent[2])
                    matched = True
                    break
            if not matched:
                y_pred.append('O')  # No entity predicted

        # Generate classification report
        report = classification_report(
            y_true, y_pred,
            labels=self.entity_types,
            target_names=self.entity_types,
            output_dict=True
        )

        # Generate multilabel confusion matrix
        label_map = {label: i for i, label in enumerate(self.entity_types)}
        y_true_bin = np.zeros((len(y_true), len(self.entity_types)))
        y_pred_bin = np.zeros((len(y_pred), len(self.entity_types)))

        for i, (true, pred) in enumerate(zip(y_true, y_pred)):
            if true in label_map:
                y_true_bin[i, label_map[true]] = 1
            if pred in label_map:
                y_pred_bin[i, label_map[pred]] = 1

        confusion_matrices = multilabel_confusion_matrix(y_true_bin, y_pred_bin)
        report['confusion_matrices'] = {
            label: matrix for label, matrix in zip(self.entity_types, confusion_matrices)
        }

        return report

    def evaluate_model(self, test_data: str) -> Dict:
        """
        Evaluate the model on test data with comprehensive metrics.

        Args:
            test_data (str): Path to test data in Spacy binary format

        Returns:
            Dict: Evaluation metrics including confusion matrices
        """
        test_docbin = DocBin().from_disk(test_data)
        test_docs = list(test_docbin.get_docs(self.nlp.vocab))
        metrics = self._evaluate_with_confusion_matrix(self.nlp, test_docs)

        print("\nTest Set Evaluation:")
        for label in self.entity_types:
            if label in metrics:
                print(f"{label} - Precision: {metrics[label]['precision']:.4f}, "
                      f"Recall: {metrics[label]['recall']:.4f}, "
                      f"F1: {metrics[label]['f1-score']:.4f}")

        print(f"\nMacro Avg F1: {metrics['macro avg']['f1-score']:.4f}")
        print(f"Weighted Avg F1: {metrics['weighted avg']['f1-score']:.4f}")

        # Print confusion matrices
        print("\nConfusion Matrices (TN, FP, FN, TP):")
        for label in self.entity_types:
            print(f"\n{label}:")
            print(metrics['confusion_matrices'][label])

        return metrics

    def predict_entities(self, text: str) -> Dict:
        """
        Predict entities in a given text with enhanced NOT_M&A handling.

        Args:
            text (str): Text to predict entities in

        Returns:
            Dict: Dictionary containing entities by type
        """
        doc = self.nlp(text)

        result = {
            'acquirers': [],
            'targets': [],
            'sellers': [],
            'not_ma': True if not doc.ents else False,  # True if no entities found
            'text': text
        }

        for ent in doc.ents:
            if ent.label_ == 'ACQUIRER':
                result['acquirers'].append(ent.text)
                result['not_ma'] = False
            elif ent.label_ == 'TARGET':
                result['targets'].append(ent.text)
                result['not_ma'] = False
            elif ent.label_ == 'SELLER':
                result['sellers'].append(ent.text)
                result['not_ma'] = False

        return result

    def predict_on_unlabeled_data(self, input_file: str, output_file: str) -> pd.DataFrame:
        """
        Predict entities on unlabeled data and save results with NOT_M&A flag.

        Args:
            input_file (str): Path to CSV with unlabeled headlines
            output_file (str): Path to save predictions

        Returns:
            pd.DataFrame: DataFrame with predictions
        """
        df = pd.read_csv(input_file)
        assert 'headline' in df.columns, "Input CSV must contain 'headline' column"

        predictions = []
        for text in df['headline']:
            pred = self.predict_entities(text)
            predictions.append({
                'headline': text,
                'acquirers': ', '.join(pred['acquirers']),
                'targets': ', '.join(pred['targets']),
                'sellers': ', '.join(pred['sellers']),
                'is_not_ma': pred['not_ma']
            })

        pred_df = pd.DataFrame(predictions)
        pred_df.to_csv(output_file, index=False)
        print(f"Predictions saved to {output_file}")

        return pred_df

In [46]:
def main():
    # Configuration
    LABELED_DATA_PATH = "/content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/ner_annotations_5k.csv"
    UNLABELED_DATA_PATH = "/content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/unseen_headlines.csv"
    OUTPUT_DIR = Path("/content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/")
    OUTPUT_DIR.mkdir(exist_ok=True)

    # Initialize enhanced model
    ner_model = EnhancedSpacyNERModel()

    # Load and preprocess data
    print("Loading and preprocessing data...")
    data = ner_model.load_and_preprocess_data(LABELED_DATA_PATH)
    print("length of preprocessed data:", len(data))
    # Split data into train, val, test (70/15/15)
    train_data, temp_data = train_test_split(data, test_size=0.3, random_state=42)
    validation, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

    train_data.to_csv("/content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/train_data.csv", index=False)
    validation.to_csv("/content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/validation.csv", index=False)
    test_data.to_csv("/content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/test_data.csv", index=False)

    print(f"Data split: {len(train_data)} train, {len(validation)} val, {len(test_data)} test")

    # Convert to Spacy format and save
    train_file = OUTPUT_DIR / "train.spacy"
    val_file = OUTPUT_DIR / "val.spacy"
    test_file = OUTPUT_DIR / "test.spacy"

    ner_model.convert_to_spacy_format(train_data, train_file)
    ner_model.convert_to_spacy_format(validation, val_file)
    ner_model.convert_to_spacy_format(test_data, test_file)

    # Hyperparameter tuning
    print("\nStarting hyperparameter tuning...")
    best_params = ner_model.tune_hyperparameters(train_file, val_file, n_trials=25)

    # Train the model with best parameters
    print("\nTraining model with best parameters...")
    ner_model.train_model(train_file, val_file, OUTPUT_DIR, use_tuned_params=True)

    # Evaluate on test set
    print("\nEvaluating on test set...")
    test_metrics = ner_model.evaluate_model(test_file)

    # Predict on unlabeled data
    print("\nPredicting on unlabeled data...")
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    prediction_file = OUTPUT_DIR / f"predictions_{timestamp}.csv"
    ner_model.predict_on_unlabeled_data(UNLABELED_DATA_PATH, prediction_file)

In [47]:
if __name__ == "__main__":
    main()

Loading and preprocessing data...
Loaded 5136 rows from /content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/ner_annotations_5k.csv
length of preprocessed data: 3545
Data split: 2481 train, 532 dev, 532 test
Saved processed data to /content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/train.spacy
Saved processed data to /content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/dev.spacy


[I 2025-06-12 10:26:21,704] A new study created in memory with name: no-name-0089b1ef-b954-4d2c-845b-9a7dbc1815de


Saved processed data to /content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/test.spacy

Starting hyperparameter tuning...


[I 2025-06-12 10:29:20,525] Trial 0 finished with value: 0.5568640256377575 and parameters: {'dropout': 0.3123620356542087, 'batch_size': 8, 'learn_rate': 0.00014321698289111514, 'n_iter': 21}. Best is trial 0 with value: 0.5568640256377575.
[I 2025-06-12 10:31:23,775] Trial 1 finished with value: 0.5701065131047425 and parameters: {'dropout': 0.45985284373248053, 'batch_size': 32, 'learn_rate': 0.0006798962421591129, 'n_iter': 26}. Best is trial 1 with value: 0.5701065131047425.
[I 2025-06-12 10:34:47,491] Trial 2 finished with value: 0.5679551649143766 and parameters: {'dropout': 0.2545474901621302, 'batch_size': 24, 'learn_rate': 0.00019553708662745247, 'n_iter': 38}. Best is trial 1 with value: 0.5701065131047425.
[I 2025-06-12 10:37:36,995] Trial 3 finished with value: 0.5442544235772612 and parameters: {'dropout': 0.24184815819561256, 'batch_size': 32, 'learn_rate': 0.0001583703155911876, 'n_iter': 35}. Best is trial 1 with value: 0.5701065131047425.
[I 2025-06-12 10:42:37,804] T

Best hyperparameters: {'dropout': 0.349068998923672, 'batch_size': 32, 'learn_rate': 0.00042288835492685126, 'n_iter': 40}
Best F1 score: 0.5896

Training model with best parameters...
Beginning training with parameters:
Dropout: 0.349068998923672, Batch size: 32, Learn rate: 0.00042288835492685126, Iterations: 40
Training complete. Best model loaded.

Evaluating on test set...

Test Set Evaluation:
ACQUIRER - Precision: 0.5406, Recall: 0.8893, F1: 0.6724
TARGET - Precision: 0.8089, Recall: 0.5826, F1: 0.6773
SELLER - Precision: 1.0000, Recall: 0.0222, F1: 0.0435
NOT_M&A - Precision: 0.0000, Recall: 0.0000, F1: 0.0000

Macro Avg F1: 0.3483
Weighted Avg F1: 0.4433

Confusion Matrices (TN, FP, FN, TP):

ACQUIRER:
[[275 198]
 [ 29 233]]

TARGET:
[[487  30]
 [ 91 127]]

SELLER:
[[690   0]
 [ 44   1]]

NOT_M&A:
[[525   0]
 [210   0]]

Predicting on unlabeled data...
Predictions saved to /content/drive/MyDrive/Colab Notebooks/MA_NER_Spacy/predictions_20250612_115253.csv
