In [None]:
# Imports
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sklearn
from keras import Sequential, utils
from keras.layers import Dense
from sklearn.ensemble import IsolationForest
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM

# Set random state
RANDOM_STATE = 42
# Set random seed in Keras
utils.set_random_seed(RANDOM_STATE)

In [None]:
# Functions

def split_train_test(df, size, time_column):
    # Build test DataFrame
    test_df = pd.DataFrame(columns=df.columns)

    for i in np.sort(pd.unique(df['class'])): # For each class
        temp_df = df[df['class'] == i] # Select only data from a class
        # Obtain a percentage of data (end of DataFrame)
        temp_df = temp_df.tail(round(len(temp_df) * size))
        # Drop data obtained from the training DataFrame
        df.drop(index=temp_df.index, inplace=True)
        # Add data in test DataFrame
        test_df = pd.concat([test_df, temp_df])

    # Sort by time column and reset index
    # df is training (and validating) DataFrame
    df = df.sort_values(by=[time_column]).reset_index(drop=True)
    test_df = test_df.sort_values(by=[time_column]).reset_index(drop=True)
    # Return df and test_df excluding time column
    return df.drop(columns=[time_column]), test_df.drop(columns=[time_column])



def split_x_y(df, columns_to_drop):
    # Columns to drop
    df = df.drop(columns=columns_to_drop)   
    # DataFrame X (features)
    X = df.loc[:, df.columns != 'class']
    y = df.loc[:, 'class'] # y (labels)
    # Return DataFrame X and y
    return X, y



def load_data(dataset_path, file_name, size=0.2):
    # Load dataset
    df = pd.read_csv(dataset_path + file_name)
    # Split the data into 80% for training and 20% for test (default)
    train_df, test_df = split_train_test(df, size, 'timestamp')
    # Split train_df into X and y
    X_train, y_train = split_x_y(train_df, [])
    # Split train_df into X and y
    X_test, y_test = split_x_y(test_df, [])
    # GPS spoofing and jamming as a single category
    y_test = y_test.replace(2, 1)
    # Return train and test dataset
    return X_train, y_train, X_test, y_test



def split_train_data(X_train, y_train):
    train_data = X_train[y_train == 0] # Only normal samples
    # Separate 90% of the indexes for validation
    idx = int(train_data.shape[0] * 0.90)
    val_data = train_data[idx:]  # Hold-out validation set for threshold calculation
    train_data = train_data[:idx]  # Reduced x_train (with out val_data)
    # Abnormal data from train set
    # Used only for threshold estimation
    abnormal_data = X_train[y_train != 0]
    # Return training data after splitting
    return train_data, val_data, abnormal_data



def create_model(input_dim):
    autoencoder = Sequential([ # Sequential autoencoder
        Dense(units=32, activation='relu', input_dim=input_dim), # Encoder
        Dense(units=16, activation='relu'),
        Dense(units=8, activation='relu'),
        Dense(units=4, activation='relu'),
        Dense(units=8, activation='relu'), # Decoder
        Dense(units=16, activation='relu'),
        Dense(units=32, activation='relu'),
        Dense(units=input_dim, activation='sigmoid')
    ])
    # Autoencoder compile
    autoencoder.compile(optimizer='adam', loss='mean_squared_error')
    return autoencoder # Return autoencoder model



def fit(autoencoder, train_data):
    # Fit autoencoder
    history = autoencoder.fit(train_data, train_data,
                              epochs=100,
                              batch_size=32,
                              shuffle=True,
                              verbose=0)

    # Return trained model and history loss
    return autoencoder, history.history["loss"][-1]



def calculate_reconstruction_loss(x, x_hat):
    losses = np.mean(abs(x - x_hat), axis=1) # Mean Absolute Error (MAE)
    return losses # Return losses



def distance_calculation(losses, normal, abnormal):
    # For each sample loss, calculate the minimun distance and set a label for test purpose
    preds = np.zeros(len(losses)) # Create array for predicted values
    for i, loss in enumerate(losses):
        if abs(loss - normal) > abs(loss - abnormal):
            preds[i] = 1 # Abnormal
        else: preds[i] = 0 # Normal

    return preds # Return predicted values



def evaluate_learning(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred) # Accuracy
    recall = recall_score(y_true, y_pred) # Recall
    precision = precision_score(y_true, y_pred) # Precision
    f1 = f1_score(y_true, y_pred) # F1-score
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel() # Confusion matrix
    missrate = fn / (fn + tp) # Miss rate
    fallout = fp / (fp + tn) # Fall-out
    auc = roc_auc_score(y_true, y_pred) # ROC AUC
    # Return evaluation metrics
    return accuracy, recall, precision, f1, missrate, fallout, auc



def evaluate(model, val_data, abnormal_data, X_test, y_test):
    # Eval model on hold-out validation data
    val_inference = model.predict(val_data, verbose=0)
    # Eval model on abnormal data
    abnormal_inference = model.predict(abnormal_data, verbose=0)
    # Calculate reconstruction loss for validation and abnormal data
    val_losses = calculate_reconstruction_loss(val_data, val_inference)
    abnormal_losses = calculate_reconstruction_loss(abnormal_data, abnormal_inference)

    # Threshold calculation
    threshold_normal = np.mean(val_losses)
    threshold_abnormal = np.mean(abnormal_losses)
    # Show mean validation loss for normal and abnormal data (threshold)
    print("Mean Validation Loss (Normal): {} | (Abnormal): {}".format(
        threshold_normal, threshold_abnormal))

    # Test set evaluation
    inference = model.predict(X_test, verbose=0)
    losses = calculate_reconstruction_loss(X_test, inference)

    # Threshold criteria
    test_eval = distance_calculation(losses, threshold_normal, threshold_abnormal)
    # Evaluate model learning with test data
    accuracy, recall, precision, f1, missrate, fallout, auc = evaluate_learning(y_test, test_eval)
    # Save metrics to a dictionary
    metrics_dict = {"accuracy": round(accuracy, 5),
                    "recall": round(recall, 5),
                    "precision": round(precision, 5),
                    "f1_score": round(f1, 5),
                    "missrate": round(missrate, 5),
                    "fallout": round(fallout, 5),
                    "auc": round(auc, 5)}

    # Show metrics on test data
    print("\nThreshold: {} \nMetrics: {} \n\nMean Abnormal Loss: {} \nMean Normal Loss: {}".format(
        threshold_normal, metrics_dict, np.mean(losses[y_test == 1]), np.mean(losses[y_test == 0])))
    
    # Return normal threshold and metrics dictionary
    return threshold_normal, metrics_dict



def show_plot_reconstrunction_error(model, test_data, threshold):
    test_predictions = model.predict(test_data, verbose=0) # Predict test data
    mse = np.mean(np.power(test_data - test_predictions, 2), axis=1) # Calculate Mean Squared Error (MSE) on test data
    error_df = pd.DataFrame({'Reconstruction_error': mse}) # Build error DataFrame
                            
    fig, ax = plt.subplots(figsize=(12, 8)) # Plot settings
    plt.rcParams.update({'font.size': 14}) # Font size
    ax.plot(error_df.index, error_df.Reconstruction_error, marker='o', ms=3.5, linestyle='')
    ax.hlines(threshold, ax.get_xlim()[0], ax.get_xlim()[1], colors="r", zorder=100, label='Threshold')
    ax.legend()
    plt.ylabel("Reconstruction Error", fontsize=16)
    plt.xlabel("Data Index", fontsize=16)
    plt.show()
    # Return error DataFrame
    return error_df

In [None]:
# Main

# Dataset path
dataset_path = '/home/leandro/remy-project/centralized/datasets/UAVGPSAttacks/'

In [None]:
# Autoencoder (AE)

# Load datasets
X_train, y_train, X_test, y_test = load_data(dataset_path, 'data_norm.csv')
# Training data after splitting
train_data, val_data, abnormal_data = split_train_data(X_train, y_train)

input_dim = train_data.shape[1] # Number of predictor variables
# Create autoencoder model
model = create_model(input_dim)
# Fit model
model, loss_train = fit(model, train_data)

# Normal threshold and metrics dictionary for GPS spoofing detection
threshold_normal, metrics_dict = evaluate(model, val_data, abnormal_data, X_test, y_test)

print('\n\nReconstrunction Error Plot for Normal Data:\n')
_ = show_plot_reconstrunction_error(model, X_test[y_test == 0], threshold=threshold_normal)
print('\n\n\nReconstrunction Error Plot for Abnormal Data:\n')
_ = show_plot_reconstrunction_error(model, X_test[y_test == 1], threshold=threshold_normal)

In [None]:
# One-Class SVM (OC SVM)
model = OneClassSVM().fit(X_train[y_train == 0])
prediction = model.predict(X_test)

# Change anomalies values to make it consistent with the true values
prediction = [1 if i==-1 else 0 for i in prediction]
# ROC AUC
auc = roc_auc_score(y_test, prediction)

# Check the model performance
print(classification_report(y_test, prediction, digits=5))
print('\nAUC: ' + str(round(auc, 5)))

In [None]:
# Local Outlier Factor (LOF)
model = LocalOutlierFactor(n_jobs=-1, novelty=True).fit(X_train[y_train == 0].values)
prediction = model.predict(X_test.values)

# Change anomalies values to make it consistent with the true values
prediction = [1 if i==-1 else 0 for i in prediction]
# ROC AUC
auc = roc_auc_score(y_test, prediction)

# Check the model performance
print(classification_report(y_test, prediction, digits=5))
print('\nAUC: ' + str(round(auc, 5)))

In [None]:
# Isolation Forest
model = IsolationForest(n_jobs=-1, random_state=RANDOM_STATE).fit(X_train[y_train == 0].values)
prediction = model.predict(X_test.values)

# Change anomalies values to make it consistent with the true values
prediction = [1 if i==-1 else 0 for i in prediction]
# ROC AUC
auc = roc_auc_score(y_test, prediction)

# Check the model performance
print(classification_report(y_test, prediction, digits=5))
print('\nAUC: ' + str(round(auc, 5)))