In [1]:
import os
import pandas as pd
from sklearn.metrics import accuracy_score, roc_curve, recall_score
import numpy as np
from EXE_scanner import GBDTMalwareClassifier

# Setup EXE-scanner

In [2]:
# Load pre-trained main classifier
# GBDT model trained on the EMBER dataset with threshold set on our validation set 
main_classifier = GBDTMalwareClassifier()
main_classifier.load(model_path='data/models/main_clf-GBDT-model.txt', roc_curve_path='data/models/main_clf-GBDT-roc_curve.csv')

In [3]:
# Use provided model to predict data points that are not fixed at 1
def calculate_remaining_predictions(done_predictions, X, model):
    y_pred_combined = np.empty(len(done_predictions), dtype=int)
    y_pred_proba_combined = np.empty(len(done_predictions), dtype=float)
    for i in range(len(done_predictions)):
        if done_predictions[i] != 1: # Prediction not fixed at 1, need to check EXE-scanner
            model_pred = model.predict_proba(X.iloc[i].to_numpy().reshape(1, -1))[0]
            y_pred_combined[i] = (model_pred > model.threshold).astype(int)
            y_pred_proba_combined[i] = model_pred
        else: # Prediction fixed at 1, don't change it
            y_pred_combined[i] = done_predictions[i]
            y_pred_proba_combined[i] = done_predictions[i]

    return y_pred_combined, y_pred_proba_combined

## Load pre-trained EXE-scanner

In [4]:
EXE_scanner = GBDTMalwareClassifier()
EXE_scanner.load(model_path="data/models/EXE_scanner-model.txt", roc_curve_path="data/models/EXE_scanner-roc_curve.csv")

## or

## Custom setup of EXE-scanner

In [5]:
# Load train and validation datasets (see README for details)
df_train = pd.read_csv('data/train.csv', header=[0, 1])
df_val = pd.read_csv('data/val.csv', header=[0, 1])


X_train = df_train["features"]
y_train = df_train["label"]["y"]
X_val = df_val["features"]
y_val = df_val["label"]["y"]

In [None]:
# Train EXE-scanner
EXE_scanner = GBDTMalwareClassifier()
EXE_scanner.train(X_train, y_train, X_val, y_val)

In [None]:
# Setup EXE-scanner's threshold together with the pre-trained main classifier (trained on the EMBER dataset with threshold set on our validation set)
# Note that the threshold of the main classifier is not updated, only the threshold of the EXE-scanner is calculated so that the combined model has a FPR of 0.01

# Predict validation set with the main classifier
y_val_pred_clf = main_classifier.predict_proba(X_val)

# Fix predictions to 1 if they are above the threshold
y_val_pred_set_ones = np.array(list(map(lambda x: x if x <= main_classifier.threshold else 1, y_val_pred_clf)))

# Calculate remaining predictions, i.e. predictions that were not fixed in the previous step
y_val_pred_combined, y_val_pred_combined_proba = calculate_remaining_predictions(y_val_pred_set_ones, X_val, EXE_scanner)
assert len(y_val_pred_combined) == len(y_val)

# Calculate threshold for EXE-scanner
fpr, tpr, thresholds = roc_curve(y_val, y_val_pred_combined_proba)
roc_data = pd.DataFrame({"fpr": fpr, "tpr": tpr, "thresholds": thresholds})
EXE_scanner.update_threshold(roc_data, 0.01)

# Save EXE-scanner model
EXE_scanner.save("data/models/custom_EXE_scanner-model.txt")

# Evaluation

In [8]:
df_test = pd.read_csv('data/test.csv', header=[0, 1])
X_test = df_test["features"]
y_test = df_test["label"]["y"]

In [None]:
# Predict test set with the main classifier
y_test_pred_clf = main_classifier.predict(X_test)

# Fix predictions to 1 if they are above the threshold
y_test_pred_set_ones = np.array(list(map(lambda x: x if x <= main_classifier.threshold else 1, y_test_pred_clf)))

# Calculate remaining predictions, i.e. predictions that were not fixed in the previous step
y_test_pred_combined, y_test_pred_combined_combined = calculate_remaining_predictions(y_test_pred_set_ones, X_test, EXE_scanner)
assert len(y_test_pred_combined) == len(y_test)
y_pred = y_test_pred_combined

In [10]:
ACC = accuracy_score(y_test, y_pred)
TPR = recall_score(y_test, y_pred)
FPR = 1 - recall_score(y_test, y_pred, pos_label=0)
print("ACC:", round(ACC * 100, 2))
print("TPR:", round(TPR * 100, 2))
print("FPR:", round(FPR * 100, 2))

ACC: 99.02
TPR: 99.07
FPR: 1.1


# Real-world executable files

In [11]:
class WrapperEXEscanner:
    """
    A wrapper class for the use of EXE scanner with standalone malware classifier.

    Parameters:
    - main_classifier: The main classifier model.
    - EXE_scanner: The EXE scanner model.
    - main_classifier_threshold: The threshold value for the main classifier. (optional)

    Methods:
    - predict_sample(bytez, return_score=False): Predicts the label for a given sample.

    """

    def __init__(self, main_classifier, EXE_scanner, main_classifier_threshold=None):
        self.main_classifier = main_classifier
        self.EXE_scanner = EXE_scanner
        self.main_classifier_threshold = main_classifier_threshold

    def predict_sample(self, bytez, return_score=False):
        """
        Predicts the label for a given sample.

        Parameters:
        - bytez: The input sample in byte format.
        - return_score: Whether to return the prediction score. (default: False)

        Returns:
        - predicted_label: The predicted label for the sample.
        - score: The prediction score (if return_score is True).

        """
        if return_score:
            assert self.main_classifier_threshold is not None, "Main classifier's threshold must be set in order to return score."

        score = self.main_classifier.predict_proba(bytez)
        is_malicious = score > self.main_classifier_threshold
        if not is_malicious:
            score = self.EXE_scanner.get_score(bytez)
            is_malicious = score > self.EXE_scanner.threshold

        predicted_label = int(is_malicious)
        if return_score:
            return predicted_label, score
        else:
            return predicted_label

In [12]:
main_classifier = GBDTMalwareClassifier()
main_classifier.load(model_path='data/models/main_clf-GBDT-model.txt', roc_curve_path='data/models/main_clf-GBDT-roc_curve.csv')

EXE_scanner = GBDTMalwareClassifier()
EXE_scanner.load(model_path="data/models/EXE_scanner-model.txt", roc_curve_path="data/models/EXE_scanner-roc_curve.csv")

wrapper = WrapperEXEscanner(main_classifier, EXE_scanner, main_classifier_threshold=main_classifier.threshold)

In [13]:
# Requires placing executables inside "samples" folder in the root of this repository
SAMPLE_FOLDER = "samples"
for file in os.listdir(SAMPLE_FOLDER):
    with open(os.path.join(SAMPLE_FOLDER, file), "rb") as f:
        bytez = f.read()
        score = wrapper.get_score(bytez)
        print(f"{file}: {score}")