<h1><center>Binary Drone Classification<center></h1>

In [None]:
# Standard library imports
import os
import random
import time
import json
from dataclasses import dataclass
from pathlib import Path
from textwrap import indent
from typing import Dict, List, Union, Tuple
from datetime import datetime

# Data manipulation and analysis
import numpy as np
import pandas as pd
from tqdm import tqdm

# Machine Learning
import joblib
from sklearn.decomposition import PCA
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score

# Deep Learning
import tensorflow as tf
import tensorboard as tb
import cnn_on_cnn_tuner as kt
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import (
    Input,
    Conv2D,
    MaxPooling2D,
    Flatten,
    Dense,
    Dropout,
    BatchNormalization
)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorboard.compat.tensorflow_stub.io.gfile import exists

# Image processing
import cv2
from PIL import Image

# Visualization
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator


In [None]:
from src.preprocessing import Processor
from src.cnn import ClassificationModel
from src.svm import SVM, resize
from src.grad_cam import GradCAM
from src.cnn_on_cnn import TransferCNN
from src.svm_on_cnn import FeatureExtractor

In [None]:
# Define paths
BASE_PATH = Path.cwd()
DATA_PATH = BASE_PATH.joinpath('data')
MODEL_PATH = BASE_PATH.joinpath('models')

# Define constants for reproducibility
SEED = 42
LOG_LEVEL = 0

runtime_dict = {} # For storing runtimes

In [None]:
# Supress warnings and set global seed values
import os
import warnings

warnings.filterwarnings('ignore') # suppress warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = str(LOG_LEVEL)

np.random.seed(SEED)
tf.random.set_seed(SEED)

# make directories if it doesn't exist'
MODEL_PATH.mkdir(exist_ok=True) 
MODEL_PATH.joinpath('history').mkdir(exist_ok=True)

<h3><center>Modules<center></h3>

In [None]:
import numpy as np
import pandas as pd
import random
from PIL import Image
from pathlib import Path
from tqdm import tqdm
import matplotlib.pyplot as plt
from typing import Dict, Tuple, List

class Processor:
    CLASS_MAP = {'fixed-wing drone': 0, 'multi-rotor drone': 1}

    def __init__(self,
                 real_img_dir: str,
                 real_csv_path: str,
                 generated_img_dir: str,
                 generated_csv_path: str,
                 test_img_dir: str,
                 test_csv_path: str,
                 size: Tuple[int, int] = (128, 128),
                 vis_samples: int = 3):
        """
        Initialize the ImageProcessor with directory paths and processing parameters.
        """
        self.size = size
        self.vis_samples = vis_samples
        self.data = {k: {'X': [], 'y': []} for k in ['real', 'generated', 'test', 'mixed']}

        # Process all datasets
        datasets = {
            'real': (real_img_dir, real_csv_path),
            'generated': (generated_img_dir, generated_csv_path),
            'test': (test_img_dir, test_csv_path)
        }

        for dtype, (img_dir, csv_path) in datasets.items():
            self._process_dataset(img_dir, csv_path, dtype)

        self._create_mixed_dataset()
        self._normalize_and_shuffle_data()

    def _process_dataset(self, img_dir: str, csv_path: str, dataset_type: str) -> None:
        """Process a single dataset of images and their annotations."""
        df = pd.read_csv(csv_path)
        img_dir = Path(img_dir)
        vis_indices = self._get_visualization_indices(df)
        samples_to_visualize = []

        for idx, row in tqdm(df.iterrows(), total=len(df), desc=f'Processing {dataset_type} images'):
            img_path = img_dir / row['filename']
            if not img_path.exists():
                continue

            try:
                processed_image = self._process_single_image(img_path, row, dataset_type)
                if processed_image and idx in vis_indices:
                    samples_to_visualize.append(processed_image)

                if processed_image:
                    self.data[dataset_type]['X'].append(np.array(processed_image))
                    self.data[dataset_type]['y'].append(self.CLASS_MAP[row['type']])
            except Exception as e:
                print(f"Error processing {img_path.name}: {e}")

        # Visualize all samples together after processing
        if samples_to_visualize:
            fig, axes = plt.subplots(1, len(samples_to_visualize), figsize=(6 * len(samples_to_visualize), 6))
            if len(samples_to_visualize) == 1:
                axes = [axes]

            for ax, img in zip(axes, samples_to_visualize):
                ax.imshow(img)
                ax.set_title(f'{dataset_type.capitalize()} - Processed Image')
                ax.axis('off')

            plt.tight_layout()
            plt.show()
            plt.close()

    def _process_single_image(self, img_path: Path, row: pd.Series, dataset_type: str) -> Image.Image:
        """Process a single image including loading, cropping, and resizing."""
        img = Image.open(img_path).convert('RGB')
        bbox = self._get_bbox(img, row, dataset_type)
        crop = img.crop(bbox)
        return self._resize_and_pad(crop)

    def _get_bbox(self, img: Image.Image, row: pd.Series, dataset_type: str) -> List[int]:
        """Calculate bounding box coordinates based on dataset type."""
        if dataset_type == 'generated':
            width, height = img.size
            return [int(row[k] * (width if 'x' in k else height))
                   for k in ['xmin', 'ymin', 'xmax', 'ymax']]
        return [row[k] for k in ['xmin', 'ymin', 'xmax', 'ymax']]

    def _resize_and_pad(self, img: Image.Image) -> Image.Image:
        """Resize and pad image to target size while maintaining aspect ratio."""
        img_w, img_h = img.size
        scale = min(self.size[0] / img_w, self.size[1] / img_h)
        new_size = tuple(int(dim * scale) for dim in (img_w, img_h))

        resized = img.resize(new_size, Image.Resampling.LANCZOS)
        padded = Image.new('RGB', self.size, (0, 0, 0))
        paste_pos = tuple((t - n) // 2 for t, n in zip(self.size, new_size))
        padded.paste(resized, paste_pos)
        return padded

    def _get_visualization_indices(self, df: pd.DataFrame) -> List[int]:
        """Get random indices for visualization samples."""
        return random.sample(range(len(df)), min(self.vis_samples, len(df))) if len(df) > 0 else []

    def _visualize_sample(self, img: Image.Image, dataset_type: str) -> None:
        """Visualize a processed image sample."""
        fig, ax = plt.subplots(figsize=(6, 6))
        ax.imshow(img)
        ax.set_title(f'{dataset_type.capitalize()} - Processed Image')
        ax.axis('off')
        plt.show()
        plt.close()

    def _create_mixed_dataset(self) -> None:
        """Create a mixed dataset from real and generated images."""
        X = np.concatenate([self.data['real']['X'], self.data['generated']['X']])
        y = np.concatenate([self.data['real']['y'], self.data['generated']['y']])

        indices = np.random.permutation(len(X))
        self.data['mixed']['X'], self.data['mixed']['y'] = X[indices], y[indices]

    def _normalize_and_shuffle_data(self) -> None:
        """Normalize pixel values and shuffle datasets."""
        for dtype in self.data:
            X = np.array(self.data[dtype]['X'], dtype='float32') / 255.0
            y = np.array(self.data[dtype]['y'])

            if dtype != 'mixed':
                indices = np.random.permutation(len(X))
                X, y = X[indices], y[indices]

            self.data[dtype]['X'], self.data[dtype]['y'] = X, y

    def save_arrays(self, output_dir: str) -> None:
        """Save processed arrays to disk."""
        output_path = Path(output_dir)
        output_path.mkdir(exist_ok=True)

        for dtype in self.data:
            for arr_type in ['X', 'y']:
                name = f'{arr_type}_{dtype}.npy'
                arr = self.data[dtype][arr_type]
                np.save(output_path / name, arr)
                print(f"{name:<16} | Shape: {str(arr.shape):}")

In [None]:
import os
import numpy as np
import keras_tuner as kt
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from pathlib import Path
import time
import json
from datetime import datetime


class ModelArchitecture:

    @staticmethod
    def cnn_01(hp, input_shape, num_classes):

        inputs = Input(shape=input_shape)
        common_params = ModelArchitecture._tune_common_hyperparameters(hp, 'cnn_01_')
        regularizer = ModelArchitecture._tune_regularizer(hp, 'cnn_01_')

        x = inputs

        for filters in [32, 64]:
            x = Conv2D(filters, (common_params['kernel_size'],) * 2, activation=common_params['activation'],
                       kernel_regularizer=regularizer)(x)
            x = BatchNormalization()(x) if common_params['use_batch_norm'] else x
            x = MaxPooling2D((2, 2))(x)

        x = Flatten()(x)
        x = Dense(128, activation=common_params['activation'], kernel_regularizer=regularizer)(x)
        x = Dropout(common_params['dropout_rate'])(x)

        outputs = Dense(1, activation='sigmoid', kernel_regularizer=regularizer)(x)

        return Model(inputs=inputs, outputs=outputs)

    @staticmethod
    def cnn_02(hp, input_shape, num_classes):

        inputs = Input(shape=input_shape)
        common_params = ModelArchitecture._tune_common_hyperparameters(hp, 'cnn_02_')
        regularizer = ModelArchitecture._tune_regularizer(hp, 'cnn_02_')

        x = inputs

        for filters in [32, 64, 128]:
            x = Conv2D(filters, (common_params['kernel_size'],) * 2, activation=common_params['activation'],
                       kernel_regularizer=regularizer)(x)
            x = BatchNormalization()(x) if common_params['use_batch_norm'] else x
            x = MaxPooling2D((2, 2))(x)

        x = Flatten()(x)
        x = Dense(128, activation=common_params['activation'], kernel_regularizer=regularizer)(x)
        x = Dropout(common_params['dropout_rate'])(x)

        outputs = Dense(1, activation='sigmoid', kernel_regularizer=regularizer)(x)

        return Model(inputs=inputs, outputs=outputs)

    @staticmethod
    def cnn_03(hp, input_shape, num_classes):

        inputs = Input(shape=input_shape)
        common_params = ModelArchitecture._tune_common_hyperparameters(hp, 'cnn_03_')
        regularizer = ModelArchitecture._tune_regularizer(hp, 'cnn_03_')

        x = inputs

        for filters in [32, 64, 128, 256]:
            x = Conv2D(filters, (common_params['kernel_size'],) * 2, activation=common_params['activation'],
                       kernel_regularizer=regularizer)(x)
            x = BatchNormalization()(x) if common_params['use_batch_norm'] else x
            x = MaxPooling2D((2, 2))(x)

        x = Flatten()(x)
        x = Dense(128, activation=common_params['activation'], kernel_regularizer=regularizer)(x)
        x = Dropout(common_params['dropout_rate'])(x)

        outputs = Dense(1, activation='sigmoid', kernel_regularizer=regularizer)(x)

        return Model(inputs=inputs, outputs=outputs)

    @staticmethod
    def _tune_common_hyperparameters(hp, prefix):
        return {
            'activation': hp.Choice(f'{prefix}activation', ['relu', 'elu', 'leaky_relu']),
            'kernel_size': hp.Choice(f'{prefix}kernel_size', [3, 5]),
            'use_batch_norm': hp.Boolean(f'{prefix}use_batch_norm'),
            'dropout_rate': hp.Float(f'{prefix}dropout_rate', 0.20, 0.50, step=0.05)
        }

    @staticmethod
    def _tune_regularizer(hp, prefix=''):
        return tf.keras.regularizers.l2(hp.Float(f'{prefix}l2_reg', 1e-6, 1e-3, sampling='log'))


class HyperModel(kt.HyperModel):

    def __init__(self, input_shape, num_classes, architecture_fn):
        self.input_shape, self.num_classes, self.architecture_fn = input_shape, num_classes, architecture_fn

    def build(self, hp):
        model = self.architecture_fn(hp, self.input_shape, self.num_classes)
        model.compile(optimizer=Adam(hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')),
                      loss='binary_crossentropy', metrics=['accuracy'])
        return model


class ClassificationModel:

    def __init__(self,
                 data_dir: str,
                 model_dir: str,
                 num_classes: int=2, seed: int=42):
        self.data_dir, self.model_dir, self.num_classes = data_dir, model_dir, num_classes
        self.models, self.tuners = {}, {}
        os.makedirs(model_dir, exist_ok=True)

        for dataset in ['generated', 'mixed']:
            setattr(self, f'X_{dataset}', np.load(os.path.join(data_dir, f'X_{dataset}.npy')))
            setattr(self, f'y_{dataset}', np.load(os.path.join(data_dir, f'y_{dataset}.npy')).astype('float32'))

        self.X, self.y = self.X_mixed, self.y_mixed

    def set_dataset(self, dataset_type: str):

        if dataset_type not in ['generated', 'mixed']:
            raise ValueError("dataset_type must be one of: 'generated', 'mixed'")

        self.X = getattr(self, f'X_{dataset_type}')
        self.y = getattr(self, f'y_{dataset_type}')

    def create_tuner(self, name: str, architecture_type: str = 'cnn_01'):

        architectures = {f'cnn_0{i}': getattr(ModelArchitecture, f'cnn_0{i}') for i in range(1, 4)}

        if architecture_type not in architectures:
            raise ValueError(f"Architecture type must be one of {list(architectures.keys())}")

        self.tuners[name] = kt.RandomSearch(
            HyperModel(self.X.shape[1:], self.num_classes, architectures[architecture_type]),
            objective='val_accuracy', max_trials=5,
            directory=os.path.join(self.model_dir, 'tuning'), project_name=name)

        return self

    def tune_model(self, name: str, epochs=30, validation_split=0.2):
        if name not in self.tuners:
            raise ValueError(f"Tuner '{name}' not found. Create it first with create_tuner()")

        start_time = time.time()

        self.tuners[name].search(
            self.X, self.y,
            epochs=epochs,
            validation_split=validation_split,
            callbacks=[EarlyStopping(patience=5, restore_best_weights=True)]
        )

        best_hps = self.tuners[name].get_best_hyperparameters(1)[0]

        model = self.tuners[name].hypermodel.build(best_hps)

        history = model.fit(
            self.X, self.y,
            epochs=epochs,
            validation_split=validation_split,
            callbacks=[EarlyStopping(patience=5, restore_best_weights=True)]
        )

        end_time = time.time()
        runtime = end_time - start_time

        self.models[name] = model

        self.save_training_info(name, history.history, runtime)

        return self

    def save_training_info(self, name: str, history: dict, runtime: float):
        info = {
            'model_name': name,
            'history': history,
            'runtime': runtime,
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }

        history_path = os.path.join(self.model_dir, f'history/{name}_training.json')
        with open(history_path, 'w') as f:
            json.dump(info, f, indent=4)

    def load_training_info(self, name: str) -> dict:
        history_path = os.path.join(self.model_dir, f'history/{name}_training.json')
        try:
            with open(history_path, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            raise ValueError(f"Training history for model '{name}' not found")

    def get_best_hyperparameters(self, name: str):

        if name not in self.tuners:
            raise ValueError(f"Tuner '{name}' not found")

        return self.tuners[name].get_best_hyperparameters(1)[0]

    def save_model(self, name: str):

        if name not in self.models:
            raise ValueError(f"Model '{name}' not found")

        model_path = os.path.join(self.model_dir, f'{name}.keras')
        self.models[name].save(model_path)

        return self

    def load_model(self, name: str):

        model_path = os.path.join(self.model_dir, f'{name}.keras')
        self.models[name] = tf.keras.models.load_model(model_path)
        self.models[name].compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])

        return self.models[name]

    def predict(self, X):

        if not self.models:
            raise ValueError("No models available. Train or load a model first.")

        return {name: (model.predict(X) > 0.5).astype(int).flatten()
                for name, model in self.models.items()}

In [None]:
from typing import Tuple, Dict, Optional, Union, List
import time
from pathlib import Path
import numpy as np
import cv2
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from joblib import dump, load

class SVM:
    def __init__(self, model_dir: str, seed: int = 42, is_transfer: bool=False) -> None:
        self.model_dir = Path(model_dir)
        self.is_transfer = is_transfer
        self.pipeline = Pipeline([
            ('pca', PCA(n_components=0.95, random_state=seed)),
            ('svm', SVC(random_state=seed))
        ])
        self.param_grid = {
            'svm__kernel': ['rbf', 'linear'],
            'svm__C': [0.1, 1, 10],
            'svm__gamma': ['scale', 'auto', 0.01, 0.1]
        }
        self.best_estimator: Optional[Pipeline] = None
        self.history: Optional[Dict] = None

    def train(self, X_train: np.ndarray, y_train: np.ndarray) -> Tuple[Pipeline, float]:
        start_time = time.time()
        if self.is_transfer:
            X_train_prepared = X_train.reshape(X_train.shape[0], -1)
        else:
            resized = resize(X_train, (X_train.shape[0], 64, 64, X_train.shape[3]))
            X_train_prepared = resized.reshape(resized.shape[0], -1)

        grid = GridSearchCV(
            self.pipeline,
            self.param_grid,
            cv=5,
            scoring='accuracy',
            n_jobs=-1,
            verbose=2
        )

        grid.fit(X_train_prepared, y_train)
        self.best_estimator = grid.best_estimator_

        self.history = {
            'best_params': grid.best_params_,
            'cv_results': grid.cv_results_,
            'best_score': grid.best_score_
        }

        end_time = time.time()
        running_time = end_time - start_time

        print(f"Best parameters: {grid.best_params_}")
        print(f"Training completed in {running_time:.2f} seconds")

        return self.best_estimator, running_time

    def save(self, name: str) -> None:
        """Save the trained model."""
        if self.best_estimator:
            dump(self.best_estimator, self.model_dir / f'{name}.joblib')
        else:
            print("No best estimator found. Train the model first.")

    def save_history(self, name: str) -> None:
        if self.history:
            dump(self.history, self.model_dir / f'history/{name}_history.joblib')
        else:
            print("No training history found. Train the model first.")

    def test(self, X_test: np.ndarray, y_test: np.ndarray) -> Dict[str, float]:
        if not self.best_estimator:
            raise ValueError("No model found. Train or load a model first.")

        resized = resize(X_test, (X_test.shape[0], 64, 64, X_test.shape[3]))
        X_test_prepared = resized.reshape(resized.shape[0], -1)

        y_pred = self.best_estimator.predict(X_test_prepared)

        metrics = {
            'accuracy': accuracy_score(y_test, y_pred),
            'precision': precision_score(y_test, y_pred),
            'recall': recall_score(y_test, y_pred),
            'f1_score': f1_score(y_test, y_pred)
        }

        return metrics

    def load(self, name: str) -> None:
        model_path = self.model_dir / f'{name}.joblib'
        if not model_path.exists():
            raise FileNotFoundError(f"No model found at {model_path}")

        self.best_estimator = load(model_path)

    def load_history(self, name: str) -> Dict:
        history_path = self.model_dir / f'history/{name}_training.joblib'
        if not history_path.exists():
            raise FileNotFoundError(f"No history found at {history_path}")

        return load(history_path)


def resize(array: np.ndarray, new_shape: Tuple[int, int, int, int]) -> np.ndarray:
    batch_size, new_height, new_width, channels = new_shape
    resized_array = np.zeros((batch_size, new_height, new_width, channels))

    for i in range(batch_size):
        img = array[i]
        # Use INTER_AREA for downsampling [[1]](https://stackoverflow.com/questions/48121916/numpy-resize-rescale-image)
        resized_array[i] = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA)

    return resized_array

In [None]:
import tensorflow as tf
import numpy as np
from pathlib import Path

class FeatureExtractor:
    def __init__(self, model_path: Path):
        self.cnn = tf.keras.models.load_model(model_path)
        self.extractor = tf.keras.Model(
            inputs=self.cnn.input,
            outputs=self.cnn.layers[-2].output
        )

    def extract_features(self, data: np.ndarray) -> np.ndarray:
        features = self.extractor.predict(data)
        return features

In [None]:
import os
import time
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import keras_tuner as kt  # Add this import


class HyperModel(kt.HyperModel):
    def __init__(self, input_shape, num_classes, base_model, freeze_layers=True):
        self.input_shape = input_shape
        self.num_classes = num_classes
        self.base_model = base_model
        self.freeze_layers = freeze_layers

    def build(self, hp):
        # Create transfer model with hyperparameters
        inputs = Input(shape=self.input_shape, name='transfer_input')
        x = inputs

        base_layers = self.base_model.layers[:-1]

        # Add prefix to make layer names unique
        for i, layer in enumerate(base_layers):
            if isinstance(layer, tf.keras.layers.InputLayer):
                continue
            if self.freeze_layers:
                layer.trainable = False

            config = layer.get_config()
            config['name'] = f'transfer_{layer.name}_{i}'
            new_layer = type(layer).from_config(config)
            x = new_layer(x)

        # Add final dense layer
        outputs = Dense(1, activation='sigmoid', name='transfer_output')(x)
        model = Model(inputs=inputs, outputs=outputs)

        # Compile model with hyperparameters
        model.compile(
            optimizer=Adam(hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        return model


class TransferCNN:
    def __init__(self,
                 data_dir: str,
                 model_dir: str,
                 num_classes=2):
        self.data_dir = data_dir
        self.model_dir = model_dir
        self.num_classes = num_classes
        self.base_model = None
        self.transfer_model = None
        self.training_history = {}
        self.training_times = {}

        self.X = np.load(os.path.join(data_dir, 'X_real.npy'))
        self.y = np.load(os.path.join(data_dir, 'y_real.npy'))
        self.y = self.y.astype('float32')

    def train(self, epochs=10, validation_split=0.2, batch_size=32, phase='initial'):
        if self.transfer_model is None:
            raise ValueError("Transfer model not created. Call create_transfer_model first.")

        start_time = time.time()
        history = self.transfer_model.fit(
            self.X, self.y,
            epochs=epochs,
            batch_size=batch_size,
            validation_split=validation_split,
            callbacks=[EarlyStopping(patience=3, restore_best_weights=True)]
        )
        end_time = time.time()

        # Store training time
        training_time = end_time - start_time
        self.training_times[phase] = training_time

        # Store training history
        self.training_history[phase] = {
            'accuracy': history.history['accuracy'],
            'loss': history.history['loss'],
            'val_accuracy': history.history['val_accuracy'],
            'val_loss': history.history['val_loss']
        }

        return history

    def save_training_info(self, model_name: str):
        """Save training history and times to JSON files"""
        if not self.training_history or not self.training_times:
            print("No training information to save.")
            return self

        # Save training history
        history_path = os.path.join(self.model_dir, f'history/{model_name}_training.json')
        with open(history_path, 'w') as f:
            json.dump(self.training_history, f, indent=4)
        print(f"Training history saved to '{history_path}'")

        # Save training times
        times_path = os.path.join(self.model_dir, f'history/{model_name}_times.json')
        with open(times_path, 'w') as f:
            json.dump(self.training_times, f, indent=4)
        print(f"Training times saved to '{times_path}'")

        return self

    def load_base_model(self, model_name: str):
        model_path = os.path.join(self.model_dir, f'{model_name}.keras')
        self.base_model = tf.keras.models.load_model(model_path)
        return self

    def create_transfer_model(self, freeze_layers: bool = True):
        if self.base_model is None:
            raise ValueError("Base model not loaded. Call load_base_model first.")

        input_shape = self.X.shape[1:]

        base_layers = self.base_model.layers[:-1]

        inputs = Input(shape=input_shape, name='transfer_input')
        x = inputs

        # Add prefix to make layer names unique
        for i, layer in enumerate(base_layers):
            if isinstance(layer, tf.keras.layers.InputLayer):
                continue
            if freeze_layers:
                layer.trainable = False

            # Create a new layer with the same configuration but a unique name
            config = layer.get_config()
            config['name'] = f'transfer_{layer.name}_{i}'
            new_layer = type(layer).from_config(config)
            x = new_layer(x)

        outputs = Dense(1, activation='sigmoid', name='transfer_output')(x)

        self.transfer_model = Model(inputs=inputs, outputs=outputs)

        self.transfer_model.compile(
            optimizer=Adam(learning_rate=1e-4),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )

        return self

    def unfreeze_layers(self, num_layers: int = 2):

        if self.transfer_model is None:
            raise ValueError("Transfer model not created. Call create_transfer_model first.")

        for layer in self.transfer_model.layers[-num_layers - 1:-1]:
            layer.trainable = True

        self.transfer_model.compile(
            optimizer=Adam(learning_rate=1e-5),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )

        return self

    def save_model(self, model_name: str):

        if self.transfer_model is None:
            raise ValueError("No transfer model to save. Create and train a model first.")

        model_path = os.path.join(self.model_dir, f'{model_name}.keras')
        self.transfer_model.save(model_path)
        print(f"Model saved to '{model_path}'")
        return self

    def predict(self, X):

        if self.transfer_model is None:
            raise ValueError("No transfer model available. Create and train a model first.")

        pred = self.transfer_model.predict(X)
        return (pred > 0.5).astype(int).flatten()

    def hyperparameter_search(self, epochs=10, validation_split=0.2, max_trials=3):
        """
        Perform hyperparameter tuning using keras-tuner.
        """
        if self.base_model is None:
            raise ValueError("Base model not loaded. Call load_base_model first.")

        input_shape = self.X.shape[1:]

        # Create tuner
        hypermodel = HyperModel(
            input_shape=input_shape,
            num_classes=self.num_classes,
            base_model=self.base_model
        )

        tuner = kt.RandomSearch(
            hypermodel,
            objective='val_accuracy',
            max_trials=max_trials,
            directory='models/cnn_on_cnn_tuner',
            project_name='transfer_tuning'
        )

        # Split data for validation
        val_size = int(len(self.X) * validation_split)
        train_x = self.X[:-val_size]
        train_y = self.y[:-val_size]
        val_x = self.X[-val_size:]
        val_y = self.y[-val_size:]

        # Perform the search
        tuner.search(
            train_x, train_y,
            epochs=epochs,
            validation_data=(val_x, val_y),
            callbacks=[EarlyStopping(patience=5, restore_best_weights=True)]
        )

        # Get the best hyperparameters
        best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

        # Build the model with the best hyperparameters
        self.transfer_model = tuner.hypermodel.build(best_hps)

        print("Best hyperparameters found:")
        print(f"Learning rate: {best_hps.get('learning_rate')}")

        return self

In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import cv2

class GradCAM:
    def __init__(self, model, last_conv_layer_name=None):
        self.model = model
        self.last_conv_layer_name = last_conv_layer_name
        if last_conv_layer_name is None:
            self.last_conv_layer_name = self._find_last_conv_layer()

    def compute_heatmap(self, img_array, pred_index=None):
        if not isinstance(img_array, np.ndarray) or img_array.shape != (128, 128, 3):
            raise ValueError("Input must be a numpy array with shape (128, 128, 3)")

        grad_model = tf.keras.models.Model(
            [self.model.inputs],
            [self.model.get_layer(self.last_conv_layer_name).output, self.model.output]
        )

        # Add batch dimension
        img_array = np.expand_dims(img_array, axis=0)

        with tf.GradientTape() as tape:
            conv_outputs, predictions = grad_model(img_array)
            if pred_index is None:
                pred_index = tf.argmax(predictions[0])
            class_channel = predictions[:, pred_index]

        grads = tape.gradient(class_channel, conv_outputs)
        pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

        conv_outputs = conv_outputs[0]
        heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
        heatmap = tf.squeeze(heatmap)

        heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
        return heatmap.numpy()

    def display_heatmap(self, img_array, heatmap, alpha=0.4):
        # Ensure the input is in the correct format
        if not isinstance(img_array, np.ndarray) or img_array.shape != (128, 128, 3):
            raise ValueError("Input must be a numpy array with shape (128, 128, 3)")

        # Resize heatmap to match input image dimensions
        heatmap = cv2.resize(heatmap, (128, 128))
        heatmap = np.uint8(255 * heatmap)
        heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)

        # Convert input array to uint8 if needed
        if img_array.dtype != np.uint8:
            img_array = (img_array * 255).astype(np.uint8)

        superimposed_img = heatmap * alpha + img_array
        superimposed_img = np.clip(superimposed_img, 0, 255).astype(np.uint8)

        plt.figure(figsize=(10, 5))

        # Display original image
        plt.subplot(1, 2, 1)
        plt.imshow(cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB))
        plt.title('Original Image')
        plt.axis('off')

        # Display image with heatmap
        plt.subplot(1, 2, 2)
        plt.imshow(cv2.cvtColor(superimposed_img, cv2.COLOR_BGR2RGB))
        plt.title('Image with Heatmap')
        plt.axis('off')

        plt.show()

    def analyze_image(self, img_array):
        heatmap = self.compute_heatmap(img_array)
        self.display_heatmap(img_array, heatmap)

    def _find_last_conv_layer(self):
        for layer in reversed(self.model.layers):
            if isinstance(layer, (tf.keras.layers.Conv2D, tf.keras.layers.SeparableConv2D)):
                return layer.name
        raise ValueError("Could not find a convolutional layer in the model")

<h3><center>Functions<center></h3>

In [None]:
def plot_training_history(history):
    """
    Plots the training history obtained during the training of a machine learning model.

    This function visualizes the accuracy and loss values for both the training and validation
    phases over the training epochs. Two subplots are created: one displaying training and
    validation accuracy, and the other displaying training and validation loss. Each plot
    includes annotations for better readability, such as axis labels, titles, legends, and
    integer epoch markers.

    :param history: A dictionary containing the training history. It must include the keys
        'accuracy', 'val_accuracy', 'loss', and 'val_loss', where each value is a list of
        values collected for each epoch.
    :type history: dict
    :return: None
    """

    acc = history['accuracy']
    val_acc = history['val_accuracy']
    loss = history['loss']
    val_loss = history['val_loss']
    epochs = range(1, len(acc) + 1)

    # plotting accuracy
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, acc, 'bo-', label='Training accuracy')
    plt.plot(epochs, val_acc, 'ro-', label='Validation accuracy')
    plt.title('Training and validation accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))

    # plotting loss
    plt.subplot(1, 2, 2)
    plt.plot(epochs, loss, 'bo-', label='Training loss')
    plt.plot(epochs, val_loss, 'ro-', label='Validation loss')
    plt.title('Training and validation loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))

    plt.tight_layout()
    plt.show()

In [None]:
def plot_confusion_matrix(y_true, y_pred, class_names: list):
    """
    Generates and displays a normalized confusion matrix for the given true and predicted
    labels, visualized as a heatmap.

    The method takes true labels and predicted labels alongside a list of class names
    for the heatmap axes, computes the confusion matrix, normalizes it, and displays it
    using a heatmap visualization.

    :param y_true: True labels. An array or list containing the actual class labels.
    :type y_true: iterable
    :param y_pred: Predicted labels. An array or list containing the predicted class labels.
    :type y_pred: iterable
    :param class_names: Class names used for labeling the axes of the heatmap. The order
        of names corresponds to the indices in the confusion matrix.
    :type class_names: list
    :return: None. The function generates a plot but does not return any value.
    """
    cm = confusion_matrix(y_true, y_pred)
    cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]  # Normalize the confusion matrix

    plt.figure(figsize=(10, 8))
    sns.heatmap(cm_norm, annot=True, fmt='.2f', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')

    plt.show()

In [None]:
def get_performance_metrics(y_true, y_pred):
    """
    Calculates performance metrics including precision, recall, and F1-score
    for both positive and negative classes, as well as the macro-averaged F1-score.
    Useful for evaluating classification model performance.

    :param y_true: The ground truth binary labels. Expected values are 0 or 1.
    :param y_pred: The predicted binary labels. Expected values are 0 or 1.
    :return: A dictionary containing performance metrics:
             - precision_pos: Precision for the positive class (label 1).
             - recall_pos: Recall for the positive class (label 1).
             - f1_score_pos: F1-score for the positive class (label 1).
             - precision_neg: Precision for the negative class (label 0).
             - recall_neg: Recall for the negative class (label 0).
             - f1_score_neg: F1-score for the negative class (label 0).
             - f1_macro: Macro-averaged F1-score for both classes.
    """
    precision_pos = precision_score(y_true, y_pred, pos_label=1)
    recall_pos = recall_score(y_true, y_pred, pos_label=1)
    f1_score_pos = f1_score(y_true, y_pred, pos_label=1)

    precision_neg = precision_score(y_true, y_pred, pos_label=0)
    recall_neg = recall_score(y_true, y_pred, pos_label=0)
    f1_score_neg = f1_score(y_true, y_pred, pos_label=0)

    f1_macro = f1_score(y_true, y_pred, average='macro')

    return {
        'precision_pos': precision_pos,
        'recall_pos': recall_pos,
        'f1_score_pos': f1_score_pos,
        'precision_neg': precision_neg,
        'recall_neg': recall_neg,
        'f1_score_neg': f1_score_neg,
        'f1_macro': f1_macro
    }

<h2>Exploratory Data Analysis</h2>

In [None]:
# import annotations
real = pd.read_csv(DATA_PATH / 'real.csv')
generated = pd.read_csv(DATA_PATH / 'generated.csv')
test = pd.read_csv(DATA_PATH / 'test.csv')

In [None]:
real.isnull().sum()

In [None]:
generated.isnull().sum()

In [None]:
test.isnull().sum()

In [None]:
generated.head()

In [None]:
real.head()

In [None]:
test.head()

In [None]:
generated.info()

In [None]:
real.info()

In [None]:
test.info()

In [None]:
# count drone type distribution
datasets = {'Real': real, 'Generated': generated, 'Test': test}
type_counts = {}

for name, dataset in datasets.items():
    type_counts[name] = dataset['type'].value_counts()

type_counts_df = pd.DataFrame(type_counts).fillna(0).astype(int)
type_counts_df['Overall'] = type_counts_df.sum(axis=1)
type_counts_df

In [None]:
# show random drone images
def show_random_images(directory, num_images=5):
    """
    Display a random selection of images from a specified directory. The function identifies
    image files in the directory, selects a random subset up to the specified number, and
    renders them using matplotlib. The images are automatically converted from BGR
    to RGB to ensure correct color representation.

    :param directory: The directory path containing image files to be displayed.
    :type directory: str
    :param num_images: The number of random images to display. Defaults to 5.
    :type num_images: int
    :return: None
    """
    all_files = os.listdir(directory)
    image_files = [f for f in all_files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
    random_images = random.sample(image_files, min(num_images, len(image_files)))
    for image_file in random_images:
        image_path = os.path.join(directory, image_file)
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for matplotlib

        plt.figure()
        plt.imshow(image)
        plt.title(image_file)
        plt.axis('off')
    plt.show()

In [None]:
show_random_images(DATA_PATH / 'generated', num_images=3)

In [None]:
show_random_images(DATA_PATH / 'real', num_images=3)

In [None]:
show_random_images(DATA_PATH / 'test', num_images=3)

In [None]:
# combine real images
combined_real = pd.concat([real, test], ignore_index=True)
combined_real.info()

In [None]:
# show distribution of width, height, and depth
plt.figure(figsize=(15, 5))

# Plot width distribution
plt.subplot(1, 3, 1)
plt.hist(combined_real['width'], bins=30, color='blue', alpha=0.7)
plt.title('Width Distribution')
plt.xlabel('Width')
plt.ylabel('Frequency')

# Plot height distribution
plt.subplot(1, 3, 2)
plt.hist(combined_real['height'], color='green', alpha=0.7)
plt.title('Height Distribution')
plt.xlabel('Height')
plt.ylabel('Frequency')

# Plot depth distribution
plt.subplot(1, 3, 3)
plt.hist(combined_real['depth'], color='red', alpha=0.7)
plt.title('Depth Distribution')
plt.xlabel('Depth')
plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
# show unique values of width, height, and depth in real dataset
combined_real['size'] = list(zip(combined_real['width'], combined_real['height'], combined_real['depth']))
combined_real['size'].unique()

**Bounding Boxes:**

In [None]:
# calculate center coordinates of bounding boxes
combined_real['xcenter'] = (combined_real['xmin'] + combined_real['xmax']) / 2 / combined_real['width']
combined_real['ycenter'] = (combined_real['ymin'] + combined_real['ymax']) / 2 / combined_real['height']

plt.figure(figsize=(15, 5))

# Plot width distribution
plt.subplot(1, 2, 1)
plt.hist(combined_real['xcenter'], bins=30, color='blue', alpha=0.7)
plt.title('Normalized X Center Distribution')
plt.xlabel('X')
plt.ylabel('Frequency')

# Plot height distribution
plt.subplot(1, 2, 2)
plt.hist(combined_real['ycenter'], bins=30, color='green', alpha=0.7)
plt.title('Normalized Y Center Distribution')
plt.xlabel('Y')
plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
# Calculate bounding box width and height
combined_real['bbox_width'] = combined_real['xmax'] - combined_real['xmin']
combined_real['bbox_height'] = combined_real['ymax'] - combined_real['ymin']

plt.figure(figsize=(15, 5))

# Plot width distribution
plt.subplot(1, 2, 1)
plt.hist(combined_real['bbox_width'], bins=30, color='blue', alpha=0.7)
plt.title('Bounding Box Width Distribution')
plt.xlabel('Width')
plt.ylabel('Frequency')

# Plot height distribution
plt.subplot(1, 2, 2)
plt.hist(combined_real['bbox_height'], bins=30, color='green', alpha=0.7)
plt.title('Bounding Box Height Distribution')
plt.xlabel('Height')
plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
# Create normalized bounding box position heatmap
heatmap, xedges, yedges = np.histogram2d(combined_real['xcenter'], combined_real['ycenter'], bins=[64, 64])

plt.figure(figsize=(10, 8))

sns.heatmap(heatmap.T, cmap='viridis', cbar=True)
plt.title('Normalized Bounding Box Position Heatmap')
plt.xlabel('x center')
plt.ylabel('y center')
plt.xticks([])
plt.yticks([])

plt.show()

In [None]:
# calculate center coordinates of bounding boxes
generated['xcenter'] = (generated['xmin'] + generated['xmax']) / 2
generated['ycenter'] = (generated['ymin'] + generated['ymax']) / 2

plt.figure(figsize=(15, 5))

# Plot width distribution
plt.subplot(1, 2, 1)
plt.hist(generated['xcenter'], bins=30, color='blue', alpha=0.7)
plt.title('X Center Distribution')
plt.xlabel('X')
plt.ylabel('Frequency')

# Plot height distribution
plt.subplot(1, 2, 2)
plt.hist(generated['ycenter'], bins=30, color='green', alpha=0.7)
plt.title('Y Center Distribution')
plt.xlabel('Y')
plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
# calculate bounding box width and height
generated['bbox_width'] = (generated['xmax'] - generated['xmin']) * 320
generated['bbox_height'] = (generated['ymax'] - generated['ymin']) * 320

plt.figure(figsize=(15, 5))

# plot width distribution
plt.subplot(1, 2, 1)
plt.hist(generated['bbox_width'], bins=30, color='blue', alpha=0.7)
plt.title('Bounding Box Width Distribution')
plt.xlabel('Width')
plt.ylabel('Frequency')

# Plot height distribution
plt.subplot(1, 2, 2)
plt.hist(generated['bbox_height'], bins=30, color='green', alpha=0.7)
plt.title('Bounding Box Height Distribution')
plt.xlabel('Height')
plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
# create normalized bounding box position heatmap for generated dataset
heatmap, xedges, yedges = np.histogram2d(generated['xcenter'], generated['ycenter'], bins=[64, 64])

plt.figure(figsize=(10, 8))
sns.heatmap(heatmap.T, cmap='viridis', cbar=True)
plt.title('Normalized Bounding Box Position Heatmap')
plt.xlabel('x center')
plt.ylabel('y center')
plt.xticks([])
plt.yticks([])
plt.show()

<h2>Data Preprocessing</h2>

The processed files are already in  `data.zip`. No need to do this step then...

In [None]:
processor = Processor(
    real_img_dir=str(DATA_PATH / 'real'),
    real_csv_path=str(DATA_PATH / 'real.csv'),
    generated_img_dir=str(DATA_PATH / 'generated'),
    generated_csv_path=str(DATA_PATH / 'generated.csv'),
    test_img_dir=str(DATA_PATH / 'test'),
    test_csv_path=str(DATA_PATH / 'test.csv')
)

In [None]:
processor.save_arrays(output_dir=str(DATA_PATH))

<h2>Train</h2>

<h3>Convolutional Neural Networks<h3>

In [None]:
# initialize classifier
classifier = ClassificationModel(
    data_dir=str(DATA_PATH),
    model_dir=str(MODEL_PATH),
    num_classes=2
)

In [None]:
# train models
for architecture in ['cnn_01', 'cnn_02', 'cnn_03']:
    for dataset in ['generated', 'mixed']:
        model_name = f"{architecture}_{dataset}"
        classifier.set_dataset(dataset)
        classifier.create_tuner(model_name, architecture)
        classifier.tune_model(model_name, epochs=30)
        classifier.save_model(model_name)

In [None]:
# loss and accuracy graphs
for architecture in ['cnn_01', 'cnn_02', 'cnn_03']:
    for dataset in ['generated', 'mixed']:
        name = f"{architecture}_{dataset}"
        with open(f'{str(MODEL_PATH)}/history/{name}_training.json', 'r') as file:
            data = json.load(file)
        print(f'\nModel: {name}')
        plot_training_history(data['history'])
        print(f"Runtime: {data['runtime']}s")

<h3>Support Vector Machines</h3>

In [None]:
# resize data for svm training
X_generated = np.load(str(DATA_PATH / 'X_generated.npy'))
X_generated_svm = resize(X_generated, (X_generated.shape[0], 64, 64, X_generated.shape[3]))
X_generated_svm_pca = X_generated_svm.reshape(X_generated_svm.shape[0], -1)
X_mixed = np.load(str(DATA_PATH / 'X_mixed.npy'))
X_mixed_svm = resize(X_mixed, (X_mixed.shape[0], 64, 64, X_mixed.shape[3]))
X_mixed_svm_pca = X_mixed_svm.reshape(X_mixed_svm.shape[0], -1)
print(X_generated_svm_pca.shape)
print(X_mixed_svm_pca.shape)

In [None]:
# PCA generated
pca = PCA().fit(X_generated_svm_pca)
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('Number of Components')
plt.ylabel('Cumulative Explained Variance')
plt.title('Explained Variance vs Number of Components')
plt.grid(True)
plt.show()

In [None]:
# PCA mixed
pca = PCA().fit(X_mixed_svm_pca)
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('Number of Components')
plt.ylabel('Cumulative Explained Variance')
plt.title('Explained Variance vs Number of Components')
plt.grid(True)
plt.show()

In [None]:
# Train SVM generated
svm_model = SVM(model_dir=MODEL_PATH, seed=SEED)
best_model, training_time = svm_model.train(X_generated_svm, np.load(DATA_PATH / 'y_generated.npy'))
print(f"Training time: {training_time:.2f} seconds")
runtime_dict['SVM1'] = training_time
svm_model.save("svm_01")
svm_model.save_history("svm_01")

In [None]:
# Train SVM mixed
svm_model = SVM(model_dir=MODEL_PATH, seed=SEED)
best_model, training_time = svm_model.train(X_mixed_svm, np.load(DATA_PATH / 'y_mixed.npy'))
print(f"Training time: {training_time:.2f} seconds")
runtime_dict['SVM3'] = training_time
svm_model.save("svm_02")
svm_model.save_history("svm_02")

<h3>Transfer Models</h3>

<h4>CNN on CNN</h4>

In [None]:
# initialize transfer-model with paths and number of classes
transfer = TransferCNN(
    data_dir=DATA_PATH,
    model_dir=MODEL_PATH,
    num_classes=2
)

In [None]:
# load the pre-trained base model (generated)
transfer.load_base_model('cnn_03_generated')

In [None]:
# Execute hyperparamter tuning
transfer.hyperparameter_search(
    epochs=10,
    validation_split=0.2,
    max_trials=3
)

In [None]:
# train new layer
history = transfer.train(
    epochs=10,
    batch_size=32,
    validation_split=0.2,
    phase='initial'
)

In [None]:
# un-freez two layers
transfer.unfreeze_layers(num_layers=2)

In [None]:
# fine-tune new and unfrozen layers
history = transfer.train(
    epochs=10,
    batch_size=32,
    validation_split=0.2,
    phase='fine_tuning'
)

In [None]:
# save model and training history
transfer.save_model('cnn_on_cnn')
transfer.save_training_info('cnn_on_cnn')

In [None]:
# import training history
with open(f'{str(MODEL_PATH)}/history/cnn_on_cnn_training.json', 'r') as file: # load training history
    data = json.load(file)
# loss and accuracy graphs
for step in ['initial', 'fine_tuning']:
    print(f'\nModel: {step}')
    plot_training_history(data[step])

<h4>SVM on CNN</h4>

In [None]:
# extract features from base CNN and load real dataset
extractor = FeatureExtractor(str(MODEL_PATH / 'cnn_03_generated.keras'))
extracted_features = extractor.extract_features(np.load(DATA_PATH / 'X_real.npy'))
y_real = np.load(DATA_PATH / 'y_real.npy')

In [None]:
# PCA
pca = PCA().fit(extracted_features.reshape(extracted_features.shape[0], -1))
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('Number of Components')
plt.ylabel('Cumulative Explained Variance')
plt.title('Explained Variance vs Number of Components')
plt.grid(True)
plt.show()

In [None]:
# train SVM on CNN features
svm_model = SVM(model_dir=MODEL_PATH, seed=SEED, is_transfer=True)
best_model, training_time = svm_model.train(extracted_features, y_real)
print(f"Training time: {training_time:.2f} seconds")
runtime_dict['SVM2'] = training_time
svm_model.save("svm_on_cnn")
svm_model.save_history("svm_on_cnn")

<h2>Test</h2>

In [None]:
# Load models
CNN1 = load_model(str(MODEL_PATH / 'cnn_03_generated.keras'))
CNN2 = load_model(str(MODEL_PATH / 'cnn_on_cnn.keras'))
CNN3 = load_model(str(MODEL_PATH / 'cnn_02_mixed.keras'))
SVM1 = joblib.load(MODEL_PATH / 'svm_01.joblib')
SVM2 = joblib.load(MODEL_PATH / 'svm_on_cnn.joblib')
SVM3 = joblib.load(MODEL_PATH / 'svm_02.joblib')

In [None]:
# Load test set
X_test = np.load(DATA_PATH / 'X_test.npy')
y_test = np.load(DATA_PATH / 'y_test.npy')
X_resized = resize(X_test, (X_test.shape[0], 64, 64, X_test.shape[3]))
X_resized = X_resized.reshape(X_resized.shape[0], -1)

In [None]:
results = {} # initialize results dict

In [None]:
# test performance of the CNNs
for name, model in {'CNN1': CNN1, 'CNN2': CNN2, 'CNN3': CNN3}.items():
    start_time = time.time()
    y_pred = (model.predict(X_test) > 0.5).astype(int).flatten()
    end_time = time.time()
    duration = end_time - start_time
    print(f"{name}: {duration:.10f} seconds")
    results[name] = get_performance_metrics(y_test, y_pred)

In [None]:
# test standard SVM performance
for name, model in {'SVM1': SVM1, 'SVM3': SVM3}.items():
    start_time = time.time()
    y_pred = model.predict(X_resized)
    end_time = time.time()
    duration = end_time - start_time
    print(f"{name}: {duration:.10f} seconds")
    results[name] = get_performance_metrics(y_test, y_pred)

In [None]:
# test SVM on CNN performance
extractor = FeatureExtractor(str(MODEL_PATH / 'cnn_03_generated.keras'))
start_time = time.time()
extracted_features = extractor.extract_features(X_test)
y_pred = SVM2.predict(extracted_features)
end_time = time.time()
duration = end_time - start_time
print(f"SVM2: {duration:.10f} seconds")
results['SVM2'] = get_performance_metrics(y_test, y_pred)

In [None]:
results = pd.DataFrame.from_dict(results).T
results

In [None]:
# get running time for standard CNNs
for name, file in {'CNN1': 'cnn_03_generated_training.json', 'CNN3': 'cnn_02_mixed_training.json',}.items():
    with open(f'{str(MODEL_PATH)}/history/{file}', 'r') as file:
        data = json.load(file)
    runtime_dict[name] = data.get('runtime', data.get('history', {}).get('runtime', 0))
# get running time for CNN on CNN
with open(f'{str(MODEL_PATH)}/history/cnn_on_cnn_times.json', 'r') as file:
    data = json.load(file)
# add running times to runtime dict
runtime_dict['CNN2'] = data['initial'] + data['fine_tuning'] + runtime_dict['CNN1']
runtime_dict['SVM2'] = runtime_dict['SVM2'] + runtime_dict['CNN1']

In [None]:
runtime_dict

<h3>Grad-CAM and Error Analysis</h3>

In [None]:
# function for applying Grad-CAM
def display_grad_cam_images(model, num_img: int=3):
    """
    Displays Grad-CAM images for the given model and test dataset, based on the
    highest and lowest logits from the prediction. This function identifies
    images with extreme prediction values (both high and low), computes their
    Grad-CAM visualizations, and outputs them alongside the corresponding
    logit value.

    :param model: The trained model used for prediction and Grad-CAM analysis.
    :type model: Any
    :param num_img: The number of images to extract for high and low logits.
                    Defaults to 3.
    :type num_img: int, optional
    :return: None
    """
    high_logit = []
    img_array = np.load(str(DATA_PATH / 'X_test.npy'))
    predictions = model.predict(img_array)
    high_logit.extend(np.argsort(predictions.flatten())[-num_img:].tolist())
    high_logit.extend(np.argsort(predictions.flatten())[:num_img].tolist())
    # loop through images with high and low logits
    for idx in high_logit:
        img = img_array[idx]
        grad_cam = GradCAM(model)
        grad_cam.analyze_image(img)
        print(f'Logit Val: {predictions[idx]}')

In [None]:
# function for applying Grad-CAM to mis-classfied images
def display_misclassified(model):
    """
    Displays a randomly selected misclassified example from a given model's predictions.
    This function evaluates the model's predictions against the actual values and
    chooses one misclassified instance randomly. It utilizes GradCAM for visualizing
    the feature importance of the model on the selected example.

    :param model: The trained model used for making predictions and GradCAM analysis.
    :type model: Any suitable machine learning model with a `.predict` method.

    :return: None
    """
    img_array = np.load(str(DATA_PATH / 'X_test.npy'))
    predictions = model.predict(img_array)
    misclassified_indices = np.where((predictions > 0.5).astype(int).flatten() != np.load(str(DATA_PATH / 'y_test.npy')))[0]
    random_index = random.choice(misclassified_indices)
    grad_cam = GradCAM(model)
    grad_cam.analyze_image(img_array[random_index])
    print(f'Logit Val: {predictions[random_index]}')

In [None]:
display_grad_cam_images(CNN1)

In [None]:
display_grad_cam_images(CNN2)

In [None]:
display_grad_cam_images(CNN3)

In [None]:
display_misclassified(CNN1)

In [None]:
display_misclassified(CNN2)

In [None]:
display_misclassified(CNN3)

<h3>Model Configurations</h3>

In [None]:
cnn1_config = CNN1.get_config()
print(json.dumps(cnn1_config, indent=4))

In [None]:
cnn2_config = CNN2.get_config()
print(json.dumps(cnn2_config, indent=4))

In [None]:
cnn3_config = CNN3.get_config()
print(json.dumps(cnn3_config, indent=4))

In [None]:
svm1_params = SVM1.get_params()
print(svm1_params)

In [None]:
svm2_params = SVM2.get_params()
print(svm2_params)

In [None]:
svm3_params = SVM3.get_params()
print(svm3_params)

**DISCLAIMER**

Data provided is already preprocessed!

Below you can find the preprocessing process.

**Filtering**
```Python
#This Script removes duplicates and RGB outliers from the real_drones Dataset

import numpy as np
import imagehash
from PIL import Image
import os
import shutil

# ---- CONFIGURATION ----
src_dir      = "real_drones"               # ← this is your “source” folder
dup_dir      = "real_drones_duplicates"    # where perceptual‐duplicate files get moved
outlier_dir  = "real_drones_RGB_outliers"  # where RGB‐outlier files get moved

# ---- 1) REMOVE DUPLICATES VIA PERCEPTUAL HASH ----
import imagehash

hash_map = {}  # phash -> first filename
phash_threshold = 0  # Hamming distance threshold (0 = exact phash match)
for fname in os.listdir(src_dir):
    if not fname.lower().endswith((".jpg", ".jpeg", ".png")):
        continue
    path = os.path.join(src_dir, fname)
    with Image.open(path) as img:
        ph = imagehash.phash(img)
    # check against existing hashes
    duplicate_found = False
    for existing_ph, original_fname in list(hash_map.items()):
        if ph - existing_ph <= phash_threshold:
            # duplicate detected
            shutil.move(path, os.path.join(dup_dir, fname))
            print(f"Moved perceptual duplicate {fname} (matches {original_fname}) to {dup_dir}")
            duplicate_found = True
            break
    if not duplicate_found:
        hash_map[ph] = fname

# ---- RGB THRESHOLD FILTER (mean <5 or >250) ----
for fname in os.listdir(src_dir):
    if not fname.lower().endswith((".jpg", ".jpeg", ".png")):
        continue
    path = os.path.join(src_dir, fname)
    with Image.open(path) as img:
        arr = np.array(img)
        if arr.ndim == 3 and arr.shape[2] >= 3:
            # compute mean for R, G, B
            mean_rgb = arr[:, :, :3].reshape(-1, 3).mean(axis=0)
            if (mean_rgb < 5).any() or (mean_rgb > 250).any():
                shutil.move(path, os.path.join(outlier_dir, fname))
                print(f"Moved RGB threshold outlier {fname} to {outlier_dir}")
        else:
            print(f"Skipping non-RGB image {fname}")
```

**Augmentation**
```Python
import os
import cv2
import pandas as pd
import albumentations as A
from tqdm import tqdm

# Load annotations
df = pd.read_csv('annotations_real.csv', sep=';')

# Albumentations transforms
night_transform = A.Compose([
    A.RandomBrightnessContrast(brightness_limit=(-0.4, -0.3), contrast_limit=(-0.1, 0.5), p=1.0),
    A.RGBShift(r_shift_limit=0, g_shift_limit=0, b_shift_limit=(-40, -20), p=1.0)
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['class_labels']))

clahe_transform = A.Compose([
    A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8), p=1.0)
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['class_labels']))

# Collect new rows
augmented_rows = []

for _, row in tqdm(df.iterrows(), total=len(df)):
    img_path = os.path.join('real_drones', row['filename'])
    img = cv2.imread(img_path)
    if img is None:
        continue
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    bbox = [row['xmin'], row['ymin'], row['xmax'], row['ymax']]
    label = row['uav']

    for transform, suffix in [(night_transform, '_night'), (clahe_transform, '_clahe')]:
        result = transform(image=img, bboxes=[bbox], class_labels=[label])
        new_img = cv2.cvtColor(result['image'], cv2.COLOR_RGB2BGR)

        new_filename = row['filename'].replace('.jpg', f'{suffix}.jpg')
        cv2.imwrite(os.path.join('real_drones', new_filename), new_img)

        new_bbox = list(map(int, result['bboxes'][0]))
        new_row = row.copy()
        new_row['filename'] = new_filename
        new_row['xmin'], new_row['ymin'], new_row['xmax'], new_row['ymax'] = new_bbox
        augmented_rows.append(new_row)

# Append to CSV
df_aug = pd.DataFrame(augmented_rows)
df_final = pd.concat([df, df_aug], ignore_index=True)
df_final.to_csv('annotations_real.csv', sep=';', index=False)

print("Augmentation complete and CSV updated.")
```

**Blender Script**
```Python
import bpy
import random
import numpy as np
import mathutils
from pathlib import Path
from bpy_extras.object_utils import world_to_camera_view
import csv


class DatasetGenerator:
    PATHS = {
        'images_dir': 'images',
        'annotations_file': 'annotations.csv',
        'hdri_dir': 'hdris'
    }

    RENDER_SETTINGS = {
        'format': 'PNG',
        'resolution': 320
    }

    CSV_HEADERS = [
        'filename', 'drone_type',
        'drone_position_x', 'drone_position_y', 'drone_position_z',
        'camera_rotation_x', 'camera_rotation_y', 'camera_rotation_z',
        'bbox_min_x', 'bbox_min_y', 'bbox_width', 'bbox_height',
        'hdri_name'
    ]

    def __init__(self, output_path: str, scene_setter):
        self.output_path = Path(output_path)
        self.scene_setter = scene_setter
        self.image_dir = self.output_path / self.PATHS['images_dir']
        self.csv_path = self.output_path / self.PATHS['annotations_file']
        self.hdri_dir = self.output_path / self.PATHS['hdri_dir']
        self.image_dir.mkdir(parents=True, exist_ok=True)

        self.render = bpy.context.scene.render
        self.render.image_settings.file_format = self.RENDER_SETTINGS['format']
        self.render.resolution_x = self.render.resolution_y = self.RENDER_SETTINGS['resolution']

        self.hdri_files = self._load_hdri_files()

    def _load_hdri_files(self):
        """Load all HDR files from the specified directory"""
        hdri_files = list(self.hdri_dir.glob('*.hdr'))
        hdri_files.extend(self.hdri_dir.glob('*.hdri'))
        hdri_files.extend(self.hdri_dir.glob('*.exr'))

        if not hdri_files:
            raise ValueError(f"No HDR/HDRI/EXR files found in {self.hdri_dir}")

        return hdri_files

    def generate_dataset(self, num_images: int):
        if not self.hdri_files:
            raise ValueError("No HDRI files available for generation")

        with open(self.csv_path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(self.CSV_HEADERS)

            for i in range(num_images):
                hdri_path = str(random.choice(self.hdri_files))
                if not self.scene_setter.set_random_scene(hdri_path):
                    continue

                drone = next((obj for obj in bpy.data.objects
                              if not obj.hide_viewport and any(d in obj.name
                                                               for d in self.scene_setter.drones)), None)
                if not drone:
                    continue

                bpy.context.view_layer.update()
                bbox = self.scene_setter.calculate_object_screen_coverage(drone)
                if not bbox:
                    continue

                filename = f"img_{i:04d}.png"
                self.render.filepath = str(self.image_dir / filename)
                bpy.ops.render.render(write_still=True)

                writer.writerow([
                    filename, drone.name, *drone.location,
                    *self.scene_setter.camera.rotation_euler,
                    *bbox, Path(hdri_path).stem
                ])


class SetScene:
    DOME = {
        'outer_radius': 5.0,
        'inner_radius': 4.0,
        'min_height': 1
    }

    CAMERA_SETTINGS = {
        'position': (0, 0, 0),
        'track_axis': '-Z',
        'up_axis': 'Y'
    }

    DRONE_ROTATION = {
        'angle_limit': 25,
        'full_rotation': 360
    }

    VISIBILITY = {
        'min_area': 0.01,
        'max_attempts': 20
    }

    def __init__(self, drones: list = None):
        self.drones = drones or []
        self.scene = bpy.context.scene
        self.camera = self.scene.camera
        self.camera.location = mathutils.Vector(self.CAMERA_SETTINGS['position'])

    def set_random_scene(self, hdri_path: str):
        drone_name = random.choice(self.drones)
        drone = bpy.data.objects.get(drone_name)
        if not drone:
            return False

        self._setup_scene_for_drone(drone_name)
        self.setup_hdri_background(hdri_path)
        return self.position_drone(drone)

    def setup_hdri_background(self, path):
        self.scene.world.use_nodes = True
        nodes = self.scene.world.node_tree.nodes
        links = self.scene.world.node_tree.links
        nodes.clear()

        env_tex = nodes.new('ShaderNodeTexEnvironment')
        env_tex.image = bpy.data.images.load(str(path), check_existing=True)
        bg_node = nodes.new('ShaderNodeBackground')
        output_node = nodes.new('ShaderNodeOutputWorld')

        links.new(env_tex.outputs['Color'], bg_node.inputs['Color'])
        links.new(bg_node.outputs['Background'], output_node.inputs['Surface'])

    def position_drone(self, obj):
        for _ in range(self.VISIBILITY['max_attempts']):
            coords = self._generate_dome_coordinates(self.DOME)
            obj.location = mathutils.Vector(coords)
            obj.rotation_euler = mathutils.Euler((
                np.radians(random.uniform(-self.DRONE_ROTATION['angle_limit'],
                                          self.DRONE_ROTATION['angle_limit'])),
                np.radians(random.uniform(-self.DRONE_ROTATION['angle_limit'],
                                          self.DRONE_ROTATION['angle_limit'])),
                np.radians(random.uniform(0, self.DRONE_ROTATION['full_rotation']))
            ))

            direction = obj.location - self.camera.location
            self.camera.rotation_euler = direction.to_track_quat(
                self.CAMERA_SETTINGS['track_axis'],
                self.CAMERA_SETTINGS['up_axis']
            ).to_euler()

            if self._is_drone_visible(obj):
                return True
        return False

    def calculate_object_screen_coverage(self, obj):
        depsgraph = bpy.context.evaluated_depsgraph_get()
        corners = [obj.matrix_world @ mathutils.Vector(corner)
                   for corner in obj.evaluated_get(depsgraph).bound_box]

        coords_2d = []
        for corner in corners:
            coord = world_to_camera_view(self.scene, self.camera, corner)
            coords_2d.append((coord.x, 1.0 - coord.y))

        if not coords_2d:
            return None

        x_coords, y_coords = zip(*coords_2d)
        min_x, max_x = min(x_coords), max(x_coords)
        min_y, max_y = min(y_coords), max(y_coords)

        min_x = max(0.0, min(min_x, 1.0))
        max_x = max(0.0, min(max_x, 1.0))
        min_y = max(0.0, min(min_y, 1.0))
        max_y = max(0.0, min(max_y, 1.0))

        width = max_x - min_x
        height = max_y - min_y

        if width <= 0 or height <= 0:
            return None

        return (min_x, min_y, width, height)

    def _setup_scene_for_drone(self, active_drone_name):
        for name in self.drones:
            if obj := bpy.data.objects.get(name):
                obj.hide_render = obj.hide_viewport = (name != active_drone_name)

    def _is_drone_visible(self, obj):
        bpy.context.view_layer.update()
        if bbox := self.calculate_object_screen_coverage(obj):
            return bbox[2] * bbox[3] >= self.VISIBILITY['min_area']
        return False

    @staticmethod
    def _generate_dome_coordinates(params):
        phi = random.uniform(0, 2 * np.pi)
        theta = random.uniform(0, np.pi / 2)
        r = random.uniform(params['inner_radius'], params['outer_radius'])

        return (r * np.sin(theta) * np.cos(phi),
                r * np.sin(theta) * np.sin(phi),
                max(r * np.cos(theta), params['min_height']))


# Usage example
scene_setter = SetScene(['Fixed_01', 'Fixed_02', 'Rotor_01', 'Rotor_02'])
dataset_generator = DatasetGenerator(
    output_path='/Users/oskardale/Desktop/DroneImages',
    scene_setter=scene_setter
)
dataset_generator.generate_dataset(10)
```