<a href="https://colab.research.google.com/github/joaosMart/fish-species-class-siglip/blob/main/Code/species-classification/resnet_transfer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ResNET-50 as a Feature extractor

This notebook implements the ResNet-50 baseline model approach for fish species classification as described in "Temporal Aggregation of Vision-Language Features for High-Accuracy Fish Classification in Automated Monitoring"

The approach uses the ResNet-50 model features from the middle frame of video segments to classify three salmonid species: Trout, Salmon, and Arctic Char.

To easily reproduce the paper results please upload the SigLIP extracted features from the zenodo repository:

```https://zenodo.org/records/17249918```

You can find the file at

```/Feature Extraction/ViT-SO400M-SigLIP-features.zip```

To extract, run:

```python
!unzip "/content/ResNet-50-features.zip" -d /content/ResNet-50
```

Then run the entire code in this notebook. At the end do not forget to run the code to zip the file to download it.

In [1]:
!unzip "/content/ResNet-50-features.zip" -d /content/ResNet-50

Archive:  /content/ResNet-50-features.zip
  inflating: /content/ResNet-50/trout_videos_2139_frame_237_resnet_features.npz  
  inflating: /content/ResNet-50/trout_videos_1917_frame_207_resnet_features.npz  
  inflating: /content/ResNet-50/trout_videos_1140_frame_147_resnet_features.npz  
  inflating: /content/ResNet-50/trout_videos_160_frame_366_resnet_features.npz  
  inflating: /content/ResNet-50/salmon_videos_10_frame_195_resnet_features.npz  
  inflating: /content/ResNet-50/salmon_videos_633_frame_206_resnet_features.npz  
  inflating: /content/ResNet-50/trout_videos_2911_frame_19_resnet_features.npz  
  inflating: /content/ResNet-50/trout_videos_2572_frame_28_resnet_features.npz  
  inflating: /content/ResNet-50/char_videos_192_frame_27_resnet_features.npz  
  inflating: /content/ResNet-50/trout_videos_1410_frame_26_resnet_features.npz  
  inflating: /content/ResNet-50/trout_videos_80_frame_123_resnet_features.npz  
  inflating: /content/ResNet-50/trout_videos_1266_frame_103_resnet

In [None]:
import shutil
import os

def create_local_copy(drive_path: str) -> str:
    # Create directory in /content/
    local_path = '/content/local_features'

    # Remove if already exists
    if os.path.exists(local_path):
        shutil.rmtree(local_path)

    # Copy data from Drive to local
    print(f"Copying data to: {local_path}")
    shutil.copytree(drive_path, local_path)

    return local_path

# Example usage
drive_path = '/path/to/Feature Extraction/ResNet-50'
local_path = create_local_copy(drive_path)
print(f"Data copied to: {local_path}")

Copying data to: /content/local_features
Data copied to: /content/local_features


In [None]:
import os
import hashlib
from collections import defaultdict

def get_file_hash(filepath):
    """Calculate MD5 hash of a file."""
    hasher = hashlib.md5()
    with open(filepath, 'rb') as file:
        # Read file in chunks to handle large files efficiently
        chunk = file.read(8192)
        while chunk:
            hasher.update(chunk)
            chunk = file.read(8192)
    return hasher.hexdigest()

def remove_duplicates(folder_path, keep_first=True):
    """
    Remove duplicate files from the specified folder.

    Args:
        folder_path (str): Path to the folder to check for duplicates
        keep_first (bool): If True, keeps the first occurrence of a file

    Returns:
        list: List of deleted file paths
    """
    # Dictionary to store file hashes and their paths
    hash_dict = defaultdict(list)
    deleted_files = []

    # Walk through the directory
    for root, _, files in os.walk(folder_path):
        for filename in files:
            filepath = os.path.join(root, filename)
            try:
                file_hash = get_file_hash(filepath)
                hash_dict[file_hash].append(filepath)
            except (IOError, OSError) as e:
                print(f"Error processing {filepath}: {e}")

    # Remove duplicate files
    for file_hash, file_list in hash_dict.items():
        if len(file_list) > 1:  # If we found duplicates
            # Sort files by creation time if you want to keep the oldest file
            file_list.sort(key=lambda x: os.path.getctime(x))

            # Keep the first file (or last if keep_first is False)
            files_to_delete = file_list[1:] if keep_first else file_list[:-1]

            for file_path in files_to_delete:
                try:
                    os.remove(file_path)
                    deleted_files.append(file_path)
                    print(f"Deleted duplicate file: {file_path}")
                except OSError as e:
                    print(f"Error deleting {file_path}: {e}")

    return deleted_files

# Example usage
if __name__ == "__main__":
    folder_path = "/content/local_features"  # Replace with your folder path
    deleted = remove_duplicates(folder_path)
    print(f"\nTotal files deleted: {len(deleted)}")


Total files deleted: 0


In [None]:
import numpy as np
from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold, learning_curve
from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import balanced_accuracy_score, f1_score, precision_recall_fscore_support
from sklearn.metrics import confusion_matrix, make_scorer
from sklearn.preprocessing import LabelEncoder
from scipy.stats import loguniform
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import pandas as pd
from typing import Dict, Tuple, List
import json
import os
import logging
import glob
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.svm import LinearSVC
from sklearn.metrics import balanced_accuracy_score, f1_score, precision_recall_fscore_support
from sklearn.metrics import confusion_matrix, roc_auc_score, precision_recall_curve
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
import seaborn as sns
from typing import Dict, Tuple, List
import logging
import pandas as pd


# Custom scorer for weighted F1
weighted_f1_scorer = make_scorer(f1_score, average='macro')

class DataLoader:
    """Handle loading and processing of NPZ files"""
    def __init__(self, data_dir: str):
        self.data_dir = data_dir

    def load_npz_files(self) -> Tuple[np.ndarray, np.ndarray]:
        """
        Load all NPZ files from directory and extract averaged_mean features and labels

        Returns:
            features: numpy array of averaged_mean features
            labels: numpy array of fish species labels
        """
        features_list = []
        labels_list = []

        # Get all NPZ files in directory
        npz_files = glob.glob(os.path.join(self.data_dir, "*.npz"))

        logging.info(f"Found {len(npz_files)} NPZ files")

        for npz_file in npz_files:
            try:
                # Load NPZ file
                data = np.load(npz_file, allow_pickle=True)

                # Extract averaged_mean feature and label
                features = data['features']  # Convert from np.ndarray to dict
                fish_species = str(data['fish_species'].item())  # Convert to string

                if features is not None:
                    features_list.append(features)
                    labels_list.append(fish_species)

            except Exception as e:
                logging.error(f"Error processing file {npz_file}: {str(e)}")
                continue

        # Convert lists to numpy arrays
        features_array = np.array(features_list)
        labels_array = np.array(labels_list)

        # Log data distribution
        unique_labels, counts = np.unique(labels_array, return_counts=True)
        for label, count in zip(unique_labels, counts):
            percentage = (count / len(labels_array)) * 100
            logging.info(f"Class {label}: {count} samples ({percentage:.2f}%)")

        return features_array, labels_array


class NumpyEncoder(json.JSONEncoder):
    """Custom encoder for numpy data types"""
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, np.bool_):
            return bool(obj)
        return super(NumpyEncoder, self).default(obj)

# Fish Classifier

In [None]:


class FishClassifier:
    def __init__(self, random_state: int = 42):
        self.random_state = random_state
        self.le = LabelEncoder()
        self.setup_logging()

    def setup_logging(self):
        """Setup logging configuration"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            filename='fish_classifier.log'
        )

    def prepare_data(self, features: np.ndarray, labels: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """
        Prepare data by splitting into train and test sets with stratification
        """
        # Encode labels
        y = self.le.fit_transform(labels)

        # Create stratified train-test split
        X_train, X_test, y_train, y_test = train_test_split(
            features,
            y,
            test_size=0.2,
            random_state=self.random_state,
            stratify=y
        )

        logging.info(f"Training set size: {X_train.shape[0]}")
        logging.info(f"Test set size: {X_test.shape[0]}")

        return X_train, X_test, y_train, y_test

    def create_baseline_models(self) -> Dict:
        """Create baseline models with default parameters"""
        models = {
            'svm': LinearSVC(
                random_state=self.random_state,
                class_weight='balanced',
                max_iter=2000  # Increased to ensure convergence
            ),
            'logistic': LogisticRegression(
                random_state=self.random_state,
                class_weight='balanced',
                max_iter=2000
            )
        }
        return models

    def evaluate_model(self, model, X: np.ndarray, y: np.ndarray, model_name: str) -> Dict:
        """
        Evaluate model performance with multiple metrics
        """
        # Get predictions
        y_pred = model.predict(X)

        # Calculate metrics
        metrics = {
            'balanced_accuracy': balanced_accuracy_score(y, y_pred),
            'macro_f1': f1_score(y, y_pred, average='macro'),
            'confusion_matrix': confusion_matrix(y, y_pred),
        }

        # Calculate per-class metrics
        precision, recall, f1, _ = precision_recall_fscore_support(y, y_pred)

        # Add per-class metrics
        for i, class_name in enumerate(self.le.classes_):
            metrics[f'{class_name}_precision'] = precision[i]
            metrics[f'{class_name}_recall'] = recall[i]
            metrics[f'{class_name}_f1'] = f1[i]

        # Log results
        logging.info(f"\nResults for {model_name}:")
        logging.info(f"Balanced Accuracy: {metrics['balanced_accuracy']:.4f}")
        logging.info(f"Macro F1: {metrics['macro_f1']:.4f}")

        return metrics

    def plot_confusion_matrix(self, confusion_mat: np.ndarray, model_name: str):
        """
        Plot confusion matrix heatmap
        """
        plt.figure(figsize=(10, 8))
        sns.heatmap(
            confusion_mat,
            annot=True,
            fmt='d',
            cmap='Blues',
            xticklabels=self.le.classes_,
            yticklabels=self.le.classes_
        )
        plt.title(f'Confusion Matrix - {model_name}')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.savefig(f'confusion_matrix_{model_name}.png')
        plt.close()

# Random Search

In [None]:
import numpy as np
from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold, learning_curve
from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import balanced_accuracy_score, f1_score, precision_recall_fscore_support
from sklearn.metrics import confusion_matrix, make_scorer
from sklearn.preprocessing import LabelEncoder
from scipy.stats import loguniform
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import pandas as pd
from typing import Dict, Tuple, List
import json
import os
import logging
import glob
from copy import deepcopy


class ModelOptimizer:
    """
    A class to handle model optimization for both SVM and Logistic Regression models.
    Uses random search with balanced class weights.
    """

    def __init__(self, random_state: int = 42, n_iter: int = 100, class_names: List[str] = None):
        self.random_state = random_state
        self.n_iter = n_iter
        self.class_names = class_names or ['Bleikja', 'Lax', 'Urridi']
        self.setup_output_dir()
        self.setup_logging()

    def setup_logging(self):
        """Configure logging settings"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(os.path.join(self.output_dir, 'optimization.log')),
                logging.StreamHandler()
            ]
        )

    def setup_output_dir(self):
        """Setup directory for saving results with timestamp"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        self.output_dir = f'model_optimization_{timestamp}'
        os.makedirs(self.output_dir, exist_ok=True)
        logging.info(f"Created output directory: {self.output_dir}")

    def create_param_distributions(self) -> Dict:
        """
        Create parameter distributions for random search.
        Only varies C parameter, using balanced class weights.
        """
        param_distributions = {
            'C': loguniform(1e-1, 3e2),  # Wide range for C
            'class_weight': ['balanced', None]  # Fixed to balanced weights
        }

        logging.info("Created parameter distributions for random search")
        return param_distributions

    def run_random_search(self, model_class, X_train: np.ndarray, y_train: np.ndarray,
                         X_test: np.ndarray, y_test: np.ndarray, model_name: str) -> Tuple[RandomizedSearchCV, pd.DataFrame]:
        """
        Run random search with cross-validation for model optimization.
        """
        logging.info(f"Starting random search for {model_name}")

        # Create base model with appropriate parameters
        if model_class == LinearSVC:
            base_model = model_class(random_state=self.random_state, max_iter=2000)
        else:  # LogisticRegression
            base_model = model_class(random_state=self.random_state, max_iter=2000,
                                   solver='lbfgs', penalty='l2')

        # Create parameter distributions
        param_distributions = self.create_param_distributions()

        # Setup cross-validation
        cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=self.random_state)

        # Initialize random search
        random_search = RandomizedSearchCV(
            estimator=base_model,
            param_distributions=param_distributions,
            n_iter=self.n_iter,
            cv=cv,
            scoring=macro_f1_scorer,
            n_jobs=-1,
            random_state=self.random_state,
            verbose=2,
            return_train_score=True
        )

        # Fit random search
        random_search.fit(X_train, y_train)

        # Create results DataFrame
        results_df = pd.DataFrame(random_search.cv_results_)

        # Save results
        self.save_results(random_search, results_df, model_name)

        logging.info(f"Completed random search for {model_name}")
        return random_search, results_df

    def save_results(self, random_search: RandomizedSearchCV, results_df: pd.DataFrame, model_name: str):
        """Save random search results to files with consistent metrics."""
        model_dir = os.path.join(self.output_dir, model_name)
        os.makedirs(model_dir, exist_ok=True)

        # Get best parameters and scores
        best_params = random_search.best_params_
        best_cv_score = random_search.best_score_

        # Find the row with best CV score for consistency check
        best_idx = results_df['mean_test_score'].idxmax()
        best_row = results_df.loc[best_idx]

        # Create comprehensive summary
        summary = {
            'best_parameters': best_params,
            'cross_validation_performance': {
                'best_score': best_cv_score,
                'std_score': best_row['std_test_score'],
                'train_score': best_row['mean_train_score'],
                'train_std': best_row['std_train_score']
            }
        }

        # Save summary as JSON
        with open(os.path.join(model_dir, 'best_params.json'), 'w') as f:
            json.dump(summary, f, indent=4)

        # Save full results DataFrame
        results_df.to_csv(os.path.join(model_dir, 'random_search_results.csv'))

        # Save readable summary
        with open(os.path.join(model_dir, 'performance_summary.txt'), 'w') as f:
            f.write("Best Model Configuration\n")
            f.write("=" * 50 + "\n\n")
            f.write("Parameters:\n")
            for param, value in best_params.items():
                f.write(f"{param}: {value}\n")
            f.write("\nCross-validation Performance:\n")
            f.write(f"Best CV Score (Macro F1): {best_cv_score:.4f} ± {best_row['std_test_score']:.4f}\n")
            f.write(f"CV Training Score: {best_row['mean_train_score']:.4f} ± {best_row['std_train_score']:.4f}\n")

        logging.info(f"Saved optimization results for {model_name} to {model_dir}")


    def evaluate_model(self, model, X: np.ndarray, y: np.ndarray, model_name: str) -> Dict:
        """Evaluate model performance with multiple metrics."""
        # Get predictions
        y_pred = model.predict(X)

        # Calculate metrics
        metrics = {
            'weighted_f1': f1_score(y, y_pred, average='weighted'),
            'macro_f1': f1_score(y, y_pred, average='macro'),
            'balanced_accuracy': balanced_accuracy_score(y, y_pred),
            'confusion_matrix': confusion_matrix(y, y_pred)
        }

        # Calculate per-class metrics
        precision, recall, f1, _ = precision_recall_fscore_support(y, y_pred)

        # Add per-class metrics
        for i, class_name in enumerate(self.class_names):
            metrics[f'{class_name}_precision'] = precision[i]
            metrics[f'{class_name}_recall'] = recall[i]
            metrics[f'{class_name}_f1'] = f1[i]

        # Log results
        logging.info(f"\nResults for {model_name}:")
        logging.info(f"Weighted F1: {metrics['weighted_f1']:.4f}")
        logging.info(f"Macro F1: {metrics['macro_f1']:.4f}")
        logging.info(f"Balanced Accuracy: {metrics['balanced_accuracy']:.4f}")

        return metrics

    def plot_results(self, results_df: pd.DataFrame, model_name: str):
        """Create clear visualization of random search results with verified metrics."""
        plt.figure(figsize=(15, 12))

        # Sort results by C parameter for smooth plotting
        results_df = results_df.sort_values('param_C')

        # Plot 1: Main Performance Plot
        plt.subplot(211)

        # Plot mean CV scores with error bands
        plt.semilogx(results_df['param_C'],
                     results_df['mean_test_score'],
                     'b-',
                     label='Cross-validation Score',
                     linewidth=2)
        plt.fill_between(results_df['param_C'],
                        results_df['mean_test_score'] - results_df['std_test_score'],
                        results_df['mean_test_score'] + results_df['std_test_score'],
                        alpha=0.2,
                        color='b')

        plt.semilogx(results_df['param_C'],
                     results_df['mean_train_score'],
                     'r-',
                     label='Training Score',
                     linewidth=2)
        plt.fill_between(results_df['param_C'],
                        results_df['mean_train_score'] - results_df['std_train_score'],
                        results_df['mean_train_score'] + results_df['std_train_score'],
                        alpha=0.2,
                        color='r')

        # Highlight best performing point
        best_idx = results_df['mean_test_score'].idxmax()
        best_C = results_df.loc[best_idx, 'param_C']
        best_score = results_df.loc[best_idx, 'mean_test_score']
        best_score_std = results_df.loc[best_idx, 'std_test_score']

        plt.plot(best_C, best_score, 'k*', markersize=15,
                label=f'Best C = {best_C:.2e}')

        plt.xlabel('C Parameter (log scale)')
        plt.ylabel('Macro F1 Score')
        plt.title(f'{model_name}: Impact of C Parameter on Model Performance')
        plt.legend()
        plt.grid(True)

        # Plot 2: Overfitting Analysis
        plt.subplot(212)

        # Calculate train-test gap
        train_test_gap = results_df['mean_train_score'] - results_df['mean_test_score']

        plt.semilogx(results_df['param_C'], train_test_gap, 'g-',
                     label='Train-CV Gap', linewidth=2)
        plt.fill_between(results_df['param_C'],
                        train_test_gap - results_df['std_test_score'],
                        train_test_gap + results_df['std_test_score'],
                        alpha=0.2,
                        color='g')

        plt.axhline(y=0, color='k', linestyle='--', alpha=0.5)
        plt.xlabel('C Parameter (log scale)')
        plt.ylabel('Train-CV Score Gap')
        plt.title('Overfitting Analysis: Train-CV Score Gap vs C')
        plt.grid(True)

        # Add text box with verified metrics
        textstr = '\n'.join([
            f'Best Configuration:',
            f'C = {best_C:.2e}',
            f'CV Score = {best_score:.4f} ± {best_score_std:.4f}',
            f'Train Score = {results_df.loc[best_idx, "mean_train_score"]:.4f}'
        ])

        plt.text(0.02, 0.98, textstr,
                transform=plt.gca().transAxes,
                bbox=dict(facecolor='white', alpha=0.8),
                verticalalignment='top')



class MultiSeedModelOptimizer(ModelOptimizer):
    """Extended ModelOptimizer class to handle multiple random seeds"""

    def __init__(self, base_seed: int = 42, n_seeds: int = 10, n_iter: int = 30,
                 class_names: List[str] = None):
        # Generate random seeds
        rng = np.random.RandomState(base_seed)
        self.seeds = np.random.choice(np.arange(1, 101), size=n_seeds, replace=False)
        self.n_seeds = n_seeds

        # Initialize with first seed
        super().__init__(random_state=self.seeds[0], n_iter=n_iter,
                        class_names=class_names)

        # Modify output directory to indicate multiple seeds
        self.output_dir = f'{self.output_dir}_multiseed'
        os.makedirs(self.output_dir, exist_ok=True)

        # Save seeds information
        with open(os.path.join(self.output_dir, 'random_seeds.json'), 'w') as f:
            json.dump({'base_seed': base_seed, 'generated_seeds': self.seeds.tolist()}, f)

    def run_multi_seed_optimization(self, model_class, X_train: np.ndarray,
                                  y_train: np.ndarray, X_test: np.ndarray,
                                  y_test: np.ndarray, model_name: str) -> Dict:
        """Run random search optimization across multiple seeds"""
        all_seed_results = {}

        for seed_idx, seed in enumerate(self.seeds):
            logging.info(f"\nRunning optimization for {model_name} with seed {seed} "
                        f"({seed_idx + 1}/{self.n_seeds})")

            # Update random state
            self.random_state = seed

            # Use existing seed directory if it exists, create if it doesn't
            seed_dir = os.path.join(self.output_dir, f'seed_{seed}')
            os.makedirs(seed_dir, exist_ok=True)

            try:
                # Run random search for this seed
                random_search, results_df = self.run_random_search(
                    model_class, X_train, y_train, X_test, y_test,
                    model_name  # Removed the seed suffix from model name
                )

                # Get best model for this seed
                best_model = random_search.best_estimator_

                # Evaluate best model
                train_metrics = self.evaluate_model(
                    best_model, X_train, y_train, f"{model_name}_train"
                )
                test_metrics = self.evaluate_model(
                    best_model, X_test, y_test, f"{model_name}_test"
                )

                # Store results for this seed
                if seed not in all_seed_results:
                    all_seed_results[seed] = {}

                all_seed_results[seed][model_name] = {
                    'best_model': best_model,
                    'best_params': random_search.best_params_,
                    'cv_score': random_search.best_score_,
                    'train_metrics': train_metrics,
                    'test_metrics': test_metrics,
                    'results_df': results_df
                }

                # Save results for this model within the seed directory
                self.save_seed_results(
                    all_seed_results[seed][model_name],
                    seed_dir,
                    model_name
                )

            except Exception as e:
                logging.error(f"Error during {model_name} optimization with seed {seed}: {str(e)}")
                continue

        # Generate and save summary across seeds
        self.generate_seed_summary(all_seed_results, model_name)

        return all_seed_results

    def save_seed_results(self, results: Dict, seed_dir: str, model_name: str):
        """Save results for a specific model within a seed directory"""
        # Save metrics
        metrics_summary = {
            'best_params': results['best_params'],
            'cv_score': results['cv_score'],
            'train_metrics': results['train_metrics'],
            'test_metrics': results['test_metrics']
        }

        # Save within the seed directory with model-specific names
        with open(os.path.join(seed_dir, f'{model_name}_metrics.json'), 'w') as f:
            json.dump(metrics_summary, f, indent=4, cls=NumpyEncoder)

        # Save results DataFrame
        results['results_df'].to_csv(
            os.path.join(seed_dir, f'{model_name}_results.csv')
        )

    def plot_averaged_validation_curves(self, all_results: Dict[str, Dict]):
        """
        Create a comprehensive plot showing averaged validation curves across all seeds
        for both SVM and Logistic Regression models.
        """
        plt.figure(figsize=(15, 8))

        # Define colors and styles
        colors = {
            'SVM': 'blue',
            'LogisticRegression': 'red'
        }

        # Process each model's results
        for model_name in ['SVM', 'LogisticRegression']:
            logging.info(f"Processing validation curves for {model_name}")
            model_results = all_results[model_name]

            # Initialize lists to store scores for each C value
            c_values = set()
            train_scores_dict = {}
            val_scores_dict = {}

            # Collect all unique C values and corresponding scores across seeds
            for seed_results in model_results.values():
                results_df = seed_results['results_df']

                # Get all C values from this seed
                for idx, row in results_df.iterrows():
                    c = row['param_C']
                    c_values.add(c)

                    # Initialize lists for this C value if not exists
                    if c not in train_scores_dict:
                        train_scores_dict[c] = []
                        val_scores_dict[c] = []

                    # Append scores
                    train_scores_dict[c].append(row['mean_train_score'])
                    val_scores_dict[c].append(row['mean_test_score'])

            # Convert to sorted list
            c_values = sorted(list(c_values))

            # Calculate means and stds
            train_means = []
            train_stds = []
            val_means = []
            val_stds = []

            for c in c_values:
                train_means.append(np.mean(train_scores_dict[c]))
                train_stds.append(np.std(train_scores_dict[c]))
                val_means.append(np.mean(val_scores_dict[c]))
                val_stds.append(np.std(val_scores_dict[c]))

            # Convert to numpy arrays
            train_means = np.array(train_means)
            train_stds = np.array(train_stds)
            val_means = np.array(val_means)
            val_stds = np.array(val_stds)

            # Plot training scores with dashed lines
            plt.semilogx(c_values, train_means, '--',
                        color=colors[model_name],
                        label=f'{model_name} Training',
                        alpha=0.8)
            plt.fill_between(c_values,
                            train_means - train_stds,
                            train_means + train_stds,
                            color=colors[model_name],
                            alpha=0.1)

            # Plot validation scores with solid lines
            plt.semilogx(c_values, val_means, '-',
                        color=colors[model_name],
                        label=f'{model_name} Validation',
                        alpha=0.8)
            plt.fill_between(c_values,
                            val_means - val_stds,
                            val_means + val_stds,
                            color=colors[model_name],
                            alpha=0.1)

        plt.grid(True, which="both", ls="-", alpha=0.2)
        plt.xlabel('C Parameter (log scale)')
        plt.ylabel('Score')
        plt.title('Validation Curves: Averaged Across Seeds\n'
                  'Solid: Validation, Dashed: Training')
        plt.legend(loc='lower right')

        # Add text box with number of seeds
        plt.text(0.02, 0.98, f'Averaged across {self.n_seeds} seeds',
                 transform=plt.gca().transAxes,
                 bbox=dict(facecolor='white', alpha=0.8),
                 verticalalignment='top')

        plt.tight_layout()

        # Save the plot
        plot_path = os.path.join(self.output_dir, 'averaged_validation_curves.png')
        plt.savefig(plot_path, dpi=300, bbox_inches='tight')
        logging.info(f"Saved validation curves plot to {plot_path}")
        plt.close()

    def generate_seed_summary(self, all_results: Dict, model_name: str):
        """Generate summary statistics across all seeds"""
        # Collect metrics across seeds
        cv_scores = []
        test_scores = []
        c_values = []

        # Access the correct level of the dictionary
        for seed_results in all_results.values():
            model_results = seed_results[model_name]  # Get model-specific results
            cv_scores.append(model_results['cv_score'])
            test_scores.append(model_results['test_metrics']['weighted_f1'])
            c_values.append(model_results['best_params']['C'])

        # Calculate summary statistics
        summary = {
            'cv_score': {
                'mean': np.mean(cv_scores),
                'std': np.std(cv_scores),
                'min': np.min(cv_scores),
                'max': np.max(cv_scores)
            },
            'test_score': {
                'mean': np.mean(test_scores),
                'std': np.std(test_scores),
                'min': np.min(test_scores),
                'max': np.max(test_scores)
            },
            'c_value': {
                'mean': np.mean(c_values),
                'std': np.std(c_values),
                'min': np.min(c_values),
                'max': np.max(c_values)
            }
        }

        # Save summary
        with open(os.path.join(self.output_dir, f'{model_name}_seed_summary.json'), 'w') as f:
            json.dump(summary, f, indent=4)

        # Create visualization of results across seeds
        self.plot_seed_comparison(cv_scores, test_scores, c_values, model_name)

    def plot_seed_comparison(self, cv_scores: List[float], test_scores: List[float],
                           c_values: List[float], model_name: str):
        """Create visualization comparing results across seeds"""
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

        # Plot 1: CV vs Test Scores
        ax1.scatter(cv_scores, test_scores, alpha=0.6)
        ax1.plot([min(cv_scores), max(cv_scores)], [min(cv_scores), max(cv_scores)],
                 'k--', alpha=0.5)
        ax1.set_xlabel('CV Score')
        ax1.set_ylabel('Test Score')
        ax1.set_title('CV vs Test Score Comparison')

        # Plot 2: C Value Distribution
        ax2.hist(np.log10(c_values), bins=10)
        ax2.set_xlabel('log10(C)')
        ax2.set_ylabel('Count')
        ax2.set_title('Distribution of Best C Values')

        plt.suptitle(f'{model_name}: Results Across {self.n_seeds} Seeds')
        plt.tight_layout()
        plt.savefig(os.path.join(self.output_dir, f'{model_name}_seed_comparison.png'))
        plt.close()

def run_multi_seed_optimization(data_dir: str, class_names: List[str] = None,
                              base_seed: int = 42, n_seeds: int = 10) -> Dict:
    """Run complete random search optimization pipeline across multiple seeds"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )

    logging.info("Starting multi-seed optimization pipeline")

    try:
        # Load and prepare data
        data_loader = DataLoader(data_dir)
        features, labels = data_loader.load_npz_files()

        # Get unique class names if not provided
        if class_names is None:
            class_names = np.unique(labels).tolist()

        # Initialize classifier for data preparation
        classifier = FishClassifier()
        X_train, X_test, y_train, y_test = classifier.prepare_data(features, labels)

        # Initialize multi-seed optimizer
        optimizer = MultiSeedModelOptimizer(
            base_seed=base_seed,
            n_seeds=n_seeds,
            n_iter=30,
            class_names=class_names
        )

        # Dictionary to store results
        all_results = {}

        # Run optimization for both models
        models = {
            'SVM': LinearSVC,
            'LogisticRegression': LogisticRegression
        }

        for model_name, model_class in models.items():
            logging.info(f"\nStarting multi-seed optimization for {model_name}")

            # Run multi-seed optimization
            model_results = optimizer.run_multi_seed_optimization(
                model_class, X_train, y_train, X_test, y_test, model_name
            )

            all_results[model_name] = model_results

        # Generate averaged validation curves plot
        logging.info("Generating averaged validation curves across seeds...")
        optimizer.plot_averaged_validation_curves(all_results)

        # Save and zip results
        output_dir = optimizer.output_dir
        os.system(f'zip -r {output_dir}.zip {output_dir}')
        logging.info(f"\nResults saved to {output_dir}.zip")

        return all_results

    except Exception as e:
        logging.error(f"Error in optimization pipeline: {str(e)}")
        raise

In [None]:
# Import required libraries
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import os
import logging
from datetime import datetime
import json
from google.colab import drive

# Set random seed for reproducibility
np.random.seed(42)

# Directory settings
data_dir = "/path/to/Feature Extraction/ViT-SO400M-14-SigLIP-mean-frames"
class_names = ['Bleikja', 'Lax', 'Urridi']

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Run the optimization
try:
    print("Starting multi-seed optimization...")

    results = run_multi_seed_optimization(
        data_dir=data_dir,
        class_names=class_names,
        base_seed=42,
        n_seeds=10
    )

    print("\nOptimization completed successfully!")

    # Print summary of results
    for model_name in ['SVM', 'LogisticRegression']:
        print(f"\nSummary for {model_name}:")
        summary_file = f"model_optimization_multiseed/{model_name}_seed_summary.json"

        if os.path.exists(summary_file):
            with open(summary_file, 'r') as f:
                summary = json.load(f)

            print("\nCV Scores:")
            print(f"Mean: {summary['cv_score']['mean']:.4f} ± {summary['cv_score']['std']:.4f}")
            print(f"Range: [{summary['cv_score']['min']:.4f}, {summary['cv_score']['max']:.4f}]")

            print("\nTest Scores:")
            print(f"Mean: {summary['test_score']['mean']:.4f} ± {summary['test_score']['std']:.4f}")
            print(f"Range: [{summary['test_score']['min']:.4f}, {summary['test_score']['max']:.4f}]")

            print("\nC Values:")
            print(f"Mean: {summary['c_value']['mean']:.4f} ± {summary['c_value']['std']:.4f}")
            print(f"Range: [{summary['c_value']['min']:.4f}, {summary['c_value']['max']:.4f}]")

except Exception as e:
    print(f"Error during optimization: {str(e)}")
    raise

# Display generated plots
try:
    output_dir = "model_optimization_multiseed"

    # Display validation curves
    validation_curves_path = os.path.join(output_dir, 'averaged_validation_curves.png')
    if os.path.exists(validation_curves_path):
        img = plt.imread(validation_curves_path)
        plt.figure(figsize=(15, 8))
        plt.imshow(img)
        plt.axis('off')
        plt.title('Averaged Validation Curves')
        plt.show()

    # Display seed comparison plots
    for model_name in ['SVM', 'LogisticRegression']:
        comparison_plot_path = os.path.join(output_dir, f'{model_name}_seed_comparison.png')
        if os.path.exists(comparison_plot_path):
            img = plt.imread(comparison_plot_path)
            plt.figure(figsize=(15, 6))
            plt.imshow(img)
            plt.axis('off')
            plt.title(f'{model_name} Seed Comparison')
            plt.show()

except Exception as e:
    print(f"Error displaying results: {str(e)}")
    raise

In [None]:
import shutil
import os

def zip_folder(folder_path, output_zip_path):
    """
    Create a zip file from a folder in Google Colab.

    Args:
        folder_path (str): Path to the folder you want to zip
        output_zip_path (str): Path where you want to save the zip file
    """
    # Make sure the folder exists
    if not os.path.exists(folder_path):
        raise ValueError(f"Folder {folder_path} does not exist")

    # Create the zip file
    shutil.make_archive(
        base_name=output_zip_path.replace('.zip', ''),
        format='zip',
        root_dir=os.path.dirname(folder_path),
        base_dir=os.path.basename(folder_path)
    )

In [None]:
# Example usage
folder_to_zip = '/path/to/model_optimization_20241211_120612_multiseed'  # Path to your folder
output_zip = '/path/to/model_optimization_20241211_120612_multiseed.zip'  # Where to save the zip file

zip_folder(folder_to_zip, output_zip)


# Full implementation

In [None]:
# Custom scorer for weighted F1
macro_f1_scorer = make_scorer(f1_score, average='macro')

class DataLoader:
    """Handle loading and processing of NPZ files with progress tracking"""
    def __init__(self, data_dir: str):
        self.data_dir = data_dir

    def load_npz_files(self) -> Tuple[np.ndarray, np.ndarray]:
        """Load NPZ files with progress bar"""
        npz_files = glob.glob(os.path.join(self.data_dir, "*.npz"))
        features_list = []
        labels_list = []

        # Set up progress bar
        pbar = tqdm(npz_files, desc="Loading data", unit="file")

        for npz_file in pbar:
            try:
                data = np.load(npz_file, allow_pickle=True)
                features = data['features']
                fish_species = str(data['fish_species'].item())

                if features is not None:
                    features_list.append(features)
                    labels_list.append(fish_species)
            except Exception as e:
                logging.error(f"Error processing {npz_file}: {str(e)}")
                continue

        features_array = np.array(features_list)
        labels_array = np.array(labels_list)

        # Log data distribution
        unique_labels, counts = np.unique(labels_array, return_counts=True)
        print("\nFeature matrix shape:", features_array.shape)
        print("Label vector shape:", labels_array.shape)
        print("\nClass distribution:")
        for label, count in zip(unique_labels, counts):
            print(f"{label}: {count} samples")

        return features_array, labels_array

class FishClassifier:
    """Base classifier with optimized evaluation"""
    def __init__(self, random_state: int = 42):
        self.random_state = random_state
        self.le = LabelEncoder()

    def prepare_data(self, features: np.ndarray, labels: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Prepare data with stratified split"""
        y = self.le.fit_transform(labels)
        return train_test_split(features, y, test_size=0.2, random_state=self.random_state, stratify=y)

    @staticmethod
    def evaluate_model(model, X: np.ndarray, y: np.ndarray, class_names: list) -> Dict:
        """Fast model evaluation with all metrics"""
        y_pred = model.predict(X)

        metrics = {
            'balanced_accuracy': balanced_accuracy_score(y, y_pred),
            'macro_f1': f1_score(y, y_pred, average='macro'),
            'weighted_f1': f1_score(y, y_pred, average='weighted'),
            'confusion_matrix': confusion_matrix(y, y_pred)
        }

        # Calculate per-class metrics
        precision, recall, f1, _ = precision_recall_fscore_support(y, y_pred)

        for i, class_name in enumerate(class_names):
            metrics[f'{class_name}_precision'] = precision[i]
            metrics[f'{class_name}_recall'] = recall[i]
            metrics[f'{class_name}_f1'] = f1[i]

        return metrics

def optimize_single_seed(X, y, model_type, seed, param_range, class_names, n_splits=5):
    """Optimized single seed evaluation"""
    # Set random states
    np.random.seed(seed)
    cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed)

    # Initialize model
    if model_type == "SVM":
        base_model = LinearSVC(random_state=seed, max_iter=2000)
    else:
        base_model = LogisticRegression(random_state=seed, max_iter=2000)

    # Setup random search with proper scoring
    random_search = RandomizedSearchCV(
        estimator=base_model,
        param_distributions={'C': param_range, 'class_weight': ['balanced']},
        n_iter=20,
        cv=cv,
        scoring='f1_macro',  # Changed to direct f1_macro scoring
        n_jobs=-1,
        random_state=seed,
        verbose=0,
        return_train_score=True
    )

    # Fit with manual progress tracking
    with tqdm(total=20, desc=f"Optimizing {model_type} (seed {seed})") as pbar:
        random_search.fit(X, y)
        pbar.update(20)  # Update after completion

    # Get best model and evaluate
    final_model = random_search.best_estimator_
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed)
    final_model.fit(X_train, y_train)

    # Calculate metrics
    train_metrics = FishClassifier.evaluate_model(final_model, X_train, y_train, class_names)
    test_metrics = FishClassifier.evaluate_model(final_model, X_test, y_test, class_names)

    print(f"\nResults for {model_type}, seed {seed}:")
    print(f"CV Score: {random_search.best_score_:.4f}")
    print(f"Test Score: {test_metrics['macro_f1']:.4f}")

    return {
        'model': final_model,
        'best_params': random_search.best_params_,
        'cv_score': random_search.best_score_,
        'train_metrics': train_metrics,
        'test_metrics': test_metrics,
        'all_cv_results': pd.DataFrame(random_search.cv_results_)
    }

def run_multi_seed_optimization(data_dir: str, class_names: List[str] = None,
                              base_seed: int = 42, n_seeds: int = 10) -> Dict:
    """Main optimization pipeline with progress tracking"""
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    print("Starting multi-seed optimization...")

    try:
        # Load data
        data_loader = DataLoader(data_dir)
        features, labels = data_loader.load_npz_files()

        if class_names is None:
            class_names = np.unique(labels).tolist()

        # Initialize structures
        param_range = loguniform(1e-1, 3e2)
        all_results = {'SVM': {}, 'LogisticRegression': {}}

        # Generate seeds
        np.random.seed(base_seed)
        seeds = seeds = [4, 15, 29, 30, 32, 37, 38, 65, 88, 91]

        # Setup output directory
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        output_dir = f'model_optimization_{timestamp}_multiseed'
        os.makedirs(output_dir, exist_ok=True)

        # Run optimization for each model type
        for model_type in ['SVM', 'LogisticRegression']:
            print(f"\nOptimizing {model_type} models across {n_seeds} seeds...")

            # Process seeds sequentially for better progress tracking
            for seed in seeds:
                result = optimize_single_seed(features, labels, model_type, seed, param_range, class_names)
                all_results[model_type][seed] = result

                # Save results
                seed_dir = os.path.join(output_dir, f'seed_{seed}')
                os.makedirs(seed_dir, exist_ok=True)

                # Save metrics
                metrics_path = os.path.join(seed_dir, f'{model_type}_metrics.json')
                metrics = {
                    'best_params': result['best_params'],
                    'cv_score': result['cv_score'],
                    'train_metrics': result['train_metrics'],
                    'test_metrics': result['test_metrics']
                }
                with open(metrics_path, 'w') as f:
                    json.dump(metrics, f, cls=NumpyEncoder)

                # Save CV results
                result['all_cv_results'].to_csv(os.path.join(seed_dir, f'{model_type}_results.csv'))

        # Generate and save summary plots
        plot_validation_curves(all_results, output_dir)
        generate_seed_summary(all_results, output_dir, class_names)

        # Zip results
        shutil.make_archive(output_dir, 'zip', output_dir)
        print(f"\nResults saved to {output_dir}.zip")

        return all_results

    except Exception as e:
        logging.error(f"Error in optimization pipeline: {str(e)}")
        raise

class NumpyEncoder(json.JSONEncoder):
    """JSON encoder for numpy types"""
    def default(self, obj):
        if isinstance(obj, (np.integer, np.floating, np.bool_)):
            return obj.item()
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        return super().default(obj)

def plot_validation_curves(all_results: Dict, output_dir: str):
    """Generate validation curves plot with mean and std bands"""
    plt.figure(figsize=(15, 8))
    colors = {'SVM': 'blue', 'LogisticRegression': 'red'}

    for model_name, results in all_results.items():
        # Collect all unique C values across seeds
        all_c_values = set()
        for seed_results in results.values():
            df = seed_results['all_cv_results']
            all_c_values.update(df['param_C'])

        c_values = sorted(list(all_c_values))
        n_c_values = len(c_values)

        # Initialize arrays for scores
        train_scores = np.zeros((len(results), n_c_values))
        val_scores = np.zeros((len(results), n_c_values))

        # Fill score arrays
        for i, (seed, seed_results) in enumerate(results.items()):
            df = seed_results['all_cv_results']
            for j, c in enumerate(c_values):
                mask = df['param_C'] == c
                if mask.any():
                    train_scores[i, j] = df.loc[mask, 'mean_train_score'].iloc[0]
                    val_scores[i, j] = df.loc[mask, 'mean_test_score'].iloc[0]

        # Calculate mean and std
        train_mean = np.mean(train_scores, axis=0)
        train_std = np.std(train_scores, axis=0)
        val_mean = np.mean(val_scores, axis=0)
        val_std = np.std(val_scores, axis=0)

        # Plot training scores
        plt.semilogx(c_values, train_mean, '--', color=colors[model_name],
                     alpha=0.8, label=f'{model_name} Training')
        plt.fill_between(c_values,
                        train_mean - train_std,
                        train_mean + train_std,
                        color=colors[model_name], alpha=0.1)

        # Plot validation scores
        plt.semilogx(c_values, val_mean, '-', color=colors[model_name],
                     alpha=0.8, label=f'{model_name} Validation')
        plt.fill_between(c_values,
                        val_mean - val_std,
                        val_mean + val_std,
                        color=colors[model_name], alpha=0.2)

    plt.grid(True)
    plt.xlabel('C Parameter (log scale)')
    plt.ylabel('Score')
    plt.title('Validation Curves: Averaged Across Seeds\nSolid: Validation, Dashed: Training')
    plt.legend(loc='lower right')
    plt.savefig(os.path.join(output_dir, 'averaged_validation_curves.png'))
    plt.close()

def generate_seed_summary(all_results: Dict, output_dir: str, class_names: List[str]):
    """Generate summary statistics and plots"""
    for model_name, results in all_results.items():
        # Collect metrics
        cv_scores = []
        test_scores = []
        c_values = []

        for seed_results in results.values():
            cv_scores.append(seed_results['cv_score'])
            test_scores.append(seed_results['test_metrics']['weighted_f1'])
            c_values.append(seed_results['best_params']['C'])

        # Calculate summary statistics
        summary = {
            'cv_score': {
                'mean': float(np.mean(cv_scores)),
                'std': float(np.std(cv_scores)),
                'min': float(np.min(cv_scores)),
                'max': float(np.max(cv_scores))
            },
            'test_score': {
                'mean': float(np.mean(test_scores)),
                'std': float(np.std(test_scores)),
                'min': float(np.min(test_scores)),
                'max': float(np.max(test_scores))
            },
            'c_value': {
                'mean': float(np.mean(c_values)),
                'std': float(np.std(c_values)),
                'min': float(np.min(c_values)),
                'max': float(np.max(c_values))
            }
        }

        # Save summary
        with open(os.path.join(output_dir, f'{model_name}_seed_summary.json'), 'w') as f:
            json.dump(summary, f, indent=4)

        # Generate comparison plot
        plt.figure(figsize=(15, 6))

        plt.subplot(121)
        plt.scatter(cv_scores, test_scores, alpha=0.6)
        plt.plot([min(cv_scores), max(cv_scores)], [min(cv_scores), max(cv_scores)],
                 'k--', alpha=0.5)
        plt.xlabel('CV Score')
        plt.ylabel('Test Score')
        plt.title('CV vs Test Score Comparison')

        plt.subplot(122)
        plt.hist(np.log10(c_values), bins=10)
        plt.xlabel('log10(C)')
        plt.ylabel('Count')
        plt.title('Distribution of Best C Values')

        plt.suptitle(f'{model_name}: Results Across Seeds')
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, f'{model_name}_seed_comparison.png'))
        plt.close()

# Main execution
if __name__ == "__main__":
    data_dir = "/content/local_features"
    class_names = ['Bleikja', 'Lax', 'Urriði']

    try:
        results = run_multi_seed_optimization(
            data_dir=data_dir,
            class_names=class_names,
            base_seed=42,
            n_seeds=10
        )

    except Exception as e:
        print(f"Error during optimization: {str(e)}")
        raise

# ZIP Folder

Please input the correct path to your folder. It should be something with the folling format:

```
/content/model_optimization_{date}_{time}_multiseed
```

In [None]:
import shutil
import os

def zip_folder(folder_path, output_zip_path):
    """
    Create a zip file from a folder in Google Colab.

    Args:
        folder_path (str): Path to the folder you want to zip
        output_zip_path (str): Path where you want to save the zip file
    """
    # Make sure the folder exists
    if not os.path.exists(folder_path):
        raise ValueError(f"Folder {folder_path} does not exist")

    # Create the zip file
    shutil.make_archive(
        base_name=output_zip_path.replace('.zip', ''),
        format='zip',
        root_dir=os.path.dirname(folder_path),
        base_dir=os.path.basename(folder_path)
    )

In [None]:
# Example usage
folder_to_zip = '/path/to/model_optimization_20250202_115215_multiseed'  # Path to your folder
output_zip = '/path/to/model_optimization_20250202_115215_multiseed.zip'  # Where to save the zip file

zip_folder(folder_to_zip, output_zip)