## *Prep*

In [21]:
import seaborn as sn
import matplotlib.pyplot as plt
import numpy as np

# The following five lines ensure that we reload the preprocessing functions 
# everytime this cell is called
import importlib
import helper_files.preprocessing as preprocFuncts
import helper_files.util as util
import parameters as MyParams
importlib.reload(preprocFuncts)
importlib.reload(util)
importlib.reload(MyParams)

# MODEL PARAMETERS

# DERIVATION PARAMETERS
WINDOW_LEN = MyParams.WINDOW_LEN # measured in IQ samples
OVERLAP = MyParams.OVERLAP
NUM_FEATURES = MyParams.NUM_FEATURES

# DATA PARAMETERS
TRAINING_DATA_DIR = MyParams.training_data_dir
EVAL_DATA_DIR = MyParams.eval_data_dir
NUM_TRAINING_FILES = MyParams.NUM_TRAINING_FILES # how many files in the saved numpy data for training
NUM_EVALUATION_FILES = MyParams.NUM_EVALUATION_FILES # how many files in the saved numpy data for evaluation
MAX_FILES = MyParams.MAX_FILES # if not using the saved numpy data, this is the max files to intake
USE_SAVED_DATA = MyParams.USE_SAVED_DATA # True = used the saved .npy file data instead of re-deriving the features again
SAVE_METRICS_TO_FILE = MyParams.SAVE_METRICS_TO_FILE
TRAINING_DATASET = MyParams.TRAINING_DATASET
EVAL_DATASET = MyParams.EVAL_DATASET

FEATURES_TO_USE = MyParams.FEATURES_TO_USE

## *Loading training dataset*

In [22]:
importlib.reload(preprocFuncts)
importlib.reload(util)
importlib.reload(MyParams)

print("Pulling from directory: ", TRAINING_DATA_DIR)

training_derived_samples, training_labels = preprocFuncts.preprocessFiles(
    TRAINING_DATA_DIR, 
    postfix=f"train_{NUM_FEATURES}ftrs_{NUM_TRAINING_FILES}files_{WINDOW_LEN}win_{'0' + str(int(OVERLAP * 100))}over{"_"+TRAINING_DATASET if TRAINING_DATASET != "" else ""}", 
    features_to_use=FEATURES_TO_USE,
    window_len=WINDOW_LEN,
    freq_bin_width=12,
    overlap=OVERLAP,
    saved_data=USE_SAVED_DATA, 
    max_files=MAX_FILES,
)

if (training_derived_samples.size <= 0):
    raise UserWarning("Array is empty") 

Pulling from directory:  /home/uav-cyberlab-rfml/RFML/test-dataset/test_dji_20mhz_comms_training
Converted 62,500,000 IQ samples to numpy array. Shape: (62500000,)
Converted 1 files
Extracting features now!
The shape of patches: (60, 2, 16, 64)
The shape of time_starts: (60,)
The shape of freq_starts: (2,)
The time_start: 0
The freq_start: 0
The time_start: 1024
The freq_start: 1024


AttributeError: 'int' object has no attribute 'size'

## *Preprocessing training dataset*

The following cell removes labels that should not be used---like annotations that were not labeled, or the 'Burst' label, or the Ruko F11 pro labels because it does not appear in the evaluation dataset. 

The cell also applies an encoder to the labels to use for fitting and predicting.

In [None]:
print("\n==BEFORE BALANCING========")
util.display(training_derived_samples, training_labels)

training_derived_samples, training_labels = preprocFuncts.balanceByMedian(training_derived_samples, training_labels, unlabeled_downsampling=70_000)

print("\n==AFTER BALANCING========")
util.display(training_derived_samples, training_labels)



X_train = training_derived_samples
y_train = training_labels

print(f"Number of samples: {X_train.shape[0]:,}")
print(f"Number of labels: {len(y_train):,}")

remove_labels = [
    'Burst',
    '',
    'Ruko_F11_pro_UL',
    'HS100_Downlink',
]

# Build mask: True = keep, False = remove
mask = ~np.isin(y_train, remove_labels)

# Apply mask
X_train = X_train[mask]
y_train = y_train[mask]
y_train_strings = y_train

print(f"\nAfter removing unnecessary labels:")
print(f"Number of samples: {X_train.shape[0]:,}")
print(f"Number of labels: {len(y_train):,}")

## *Loading evaluation dataset*

In [None]:
importlib.reload(preprocFuncts)
importlib.reload(util)
importlib.reload(MyParams)

# carlos' Mac
# data_dir = '/Users/carlos_1/Documents/GitHub/RFML-Code/RFML_Combined_Dataset_2025/RFML_Drone_Dataset_2025/old_drone_full_annotated_dataset/RFML_Old_Drone_Eval_data/*'
# uav-cyberlab-rfml laptop
data_dir = f'/home/uav-cyberlab-rfml/RFML/test_{EVAL_DATASET}_eval'
print("Pulling from directory: ", data_dir)


test_derived_samples, test_labels = preprocFuncts.preprocessFiles(
    data_dir, 
    postfix=f"eval_{NUM_FEATURES}ftrs_{NUM_EVALUATION_FILES}files_{WINDOW_LEN}win_{'0' + str(int(OVERLAP * 100))}over{"_"+EVAL_DATASET if EVAL_DATASET != "" else ""}", 
    features_to_use=FEATURES_TO_USE,
    window_len=WINDOW_LEN,
    overlap=OVERLAP,
    saved_data=USE_SAVED_DATA, 
    max_files=MAX_FILES,
)

## *Preprocessing evaluation dataset*
First I need to remove labels that were in the evaluation set but not in the training set.

In [None]:
print("\n==BEFORE BALANCING========")
util.display(test_derived_samples, test_labels)

test_derived_samples, test_labels = preprocFuncts.balanceByMedian(test_derived_samples, test_labels, unlabeled_downsampling=7_000)
print("\n==AFTER BALANCING========")
util.display(test_derived_samples, training_labels)


X_test = test_derived_samples
y_test = test_labels


print(f"Number of samples: {X_test.shape[0]:,}")
print(f"Number of labels: {y_test.shape[0]:,}")

# remove labels that are in the y_test but not in y_train
remove_labels = np.setdiff1d(y_test, y_train_strings).tolist()
print(remove_labels)

# Build mask: True = keep, False = remove
mask = ~np.isin(y_test, remove_labels)

# Apply mask
X_test = X_test[mask]
y_test = y_test[mask]

print(f"\nAfter removing unnecessary labels:")
print(f"Number of samples: {X_test.shape[0]:,}")
print(f"Number of labels: {y_test.shape[0]:,}")

y_strings = y_test # store the string version of the labels before they get encoded

## *Fitting and prediction*

In [None]:
# testing for proper overlap between testing and training

train_classes = set(np.unique(y_train))
test_classes  = set(np.unique(y_test))

print("Train classes:", len(train_classes))
print("Test classes :", len(test_classes))
print("Overlap      :", len(train_classes & test_classes))
print("Only-in-test :", sorted(test_classes - train_classes)[:20])
print("Only-in-train:", sorted(train_classes - test_classes)[:20])

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.naive_bayes import GaussianNB
from sklearn.preprocessing import PowerTransformer


# Ensure arrays
X_train = np.asarray(X_train)
y_train = np.asarray(y_train)

# 1. Scale features (important for GNB)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
power_transformer = PowerTransformer(method="yeo-johnson")
X_train_scaled = power_transformer.fit_transform(X_train_scaled)

# 2. Train GNB
gnb = GaussianNB(var_smoothing=1e-9)
gnb.fit(X_train_scaled, y_train)
print(f"Fit ✔︎")

y_pred = None
if X_test is not None and y_test is not None:
    X_test = np.asarray(X_test)
    y_test = np.asarray(y_test)

    X_test_scaled = scaler.transform(X_test)
    X_test_scaled = power_transformer.transform(X_test_scaled)
    y_pred = gnb.predict(X_test_scaled)
    print(f"Predicted ✔︎")

    acc = accuracy_score(y_test, y_pred)
    print(f"Accuracy: {acc:.2f}%")
else: 
    print("Check the testing data")

## *Metrics*

In [None]:
from sklearn.metrics import classification_report, accuracy_score
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

importlib.reload(preprocFuncts)
importlib.reload(util)

base_name='gnb'
model="Gaussian Naive-Bayes"

accuracy = accuracy_score(y_test, y_pred)
perc_accuracy = accuracy * 100
cr = classification_report(y_test, y_pred)
cm = confusion_matrix(y_test, y_pred)


util.printMetrics(
    base_name=base_name + "_metrics", 
    model=model, 
    perc_accuracy=perc_accuracy, 
    notes=f"",
    cr=cr,
    cm=cm,
    labels=y_strings,
    max_files=MAX_FILES,
    window_size=WINDOW_LEN,
    overlap=OVERLAP,
    num_features=NUM_FEATURES,
    num_training_files=NUM_TRAINING_FILES,
    num_evaluation_files=NUM_EVALUATION_FILES,
    training_dataset=TRAINING_DATASET,
    eval_dataset=EVAL_DATASET,
)


if SAVE_METRICS_TO_FILE:
    util.saveMetricsToFile(
        base_name=base_name + "_metrics", 
        model=model, 
        perc_accuracy=perc_accuracy, 
        notes="Using the fixed length sample size",
        cr=cr,
        cm=cm,
        labels=y_strings,
        max_files=MAX_FILES,
        window_size=WINDOW_LEN,
        overlap=OVERLAP,
        num_features = NUM_FEATURES,
        num_training_files = NUM_TRAINING_FILES,
        num_evaluation_files = NUM_EVALUATION_FILES,
        training_dataset=TRAINING_DATASET,
        eval_dataset=EVAL_DATASET,
    )

In [None]:
# Create a heatmap of the confusion matrix

plt.figure(figsize=(7, 7))
sn.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=np.unique(y_strings), yticklabels=np.unique(y_strings))

# Labels and title
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title(f'Confusion Matrix Heatmap for {model} Predictions')

if SAVE_METRICS_TO_FILE:
    plt.savefig(f"./metrics/{base_name}_cm_{(perc_accuracy * 100):.0f}.png", bbox_inches='tight', dpi=200)
plt.show()

The following cell prints Cohen's Kappa and Matthew's Correlation Coefficient
* Cohen's Kappa (κ) is a statistic measuring agreement between two categorical raters (or one rater at two times) beyond what's expected by chance, correcting for random agreement, and is used for inter-rater reliability in fields like machine learning. It ranges from -1 (total disagreement) to +1 (perfect agreement), with 0 meaning agreement is purely random.

* The Matthews Correlation Coefficient (MCC) is a robust metric in machine learning for evaluating binary/multiclass classification, measuring correlation between actual and predicted classes, ranging from -1 (perfect inverse) to +1 (perfect prediction), with 0 being random; it's especially valuable for imbalanced datasets as it uses all four confusion matrix values (TP, TN, FP, FN) for a balanced performance score, unlike simpler metrics that can be misleading.

In [None]:
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import matthews_corrcoef

print("Cohen's Kappa:", cohen_kappa_score(y_test, y_pred))
print("MCC:", matthews_corrcoef(y_test, y_pred))
