## DexRay

In [1]:
import os
import json
import numpy as np
from datetime import datetime
from sklearn.feature_extraction import DictVectorizer

from tesseract import evaluation, temporal, metrics, spatial

Load our data from the numpy array files.

In [2]:
X = np.load("../reproduce-dexray/data/X.npy", allow_pickle=True)
y = np.load("../reproduce-dexray/data/y.npy", allow_pickle=True)
t = np.load("../reproduce-dexray/data/temp.npy", allow_pickle=True)

In [3]:
splits = temporal.time_aware_train_test_split(X, y, t, train_size=12, test_size=1, granularity='month')

Define the Keras classifier from DexRay:

In [4]:
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense
from tensorflow.keras.models import Sequential

IMG_SIZE = 128

2024-11-30 15:01:07.308803: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-11-30 15:01:07.484214: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-11-30 15:01:07.487933: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [8]:
es_callback = tf.keras.callbacks.EarlyStopping(
    monitor="loss", patience=50, restore_best_weights=True
)

def fit_with_epochs(X_train, y_train):
    model.fit(
        X_train,
        y_train,
        shuffle=True,
        epochs=3, 
        callbacks=[es_callback],
        verbose=2,
    )

def predict_keras(X_test):
    probabilities = model.predict(X_test, verbose=0)
    return (probabilities > 0.5).astype(int).flatten()  # Convert to 1D array of labels

# Replicate the DexRay model
model_architecture = Sequential()
model_architecture.add(
    Conv1D(
        filters=64,
        kernel_size=12,
        activation="relu",
        input_shape=(IMG_SIZE * IMG_SIZE, 1),
    )
)
model_architecture.add(MaxPooling1D(pool_size=12))
model_architecture.add(Conv1D(filters=128, kernel_size=12, activation="relu"))
model_architecture.add(MaxPooling1D(pool_size=12))
model_architecture.add(Flatten())
model_architecture.add(Dense(64, activation="sigmoid"))
model_architecture.add(Dense(1, activation="sigmoid"))

model = keras.models.clone_model(model_architecture)
model.compile(
    optimizer="adam",
    loss=tf.keras.losses.BinaryCrossentropy(),
)

Train the classifier: 

In [9]:
results_20 = evaluation.fit_predict_update(model, *splits, fit_function=fit_with_epochs, predict_function=predict_keras)


[A

Epoch 1/3
1323/1323 - 1580s - loss: 0.1671 - 1580s/epoch - 1s/step
Epoch 2/3
1323/1323 - 2089s - loss: 0.1101 - 2089s/epoch - 2s/step
Epoch 3/3
1323/1323 - 1192s - loss: 0.0941 - 1192s/epoch - 901ms/step



[A
[A
[A
[A
[A
[A

In [10]:
X_train, X_test, y_train, y_test, temp_train, temp_test = splits

tp_list = results_20['tp']
tn_list = results_20['tn']
fp_list = results_20['fp']
fn_list = results_20['fn']

# Sum the lists
total_tp = sum(tp_list)
total_tn = sum(tn_list)
total_fp = sum(fp_list)
total_fn = sum(fn_list)

# Calculate accuracy
accuracy = (total_tp + total_tn) / (total_tp + total_tn + total_fp + total_fn)
print(f"Overall Accuracy: {accuracy}")

Overall Accuracy: 0.9761470281543274


In [11]:
model = keras.models.clone_model(model_architecture)
model.compile(
    optimizer="adam",
    loss=tf.keras.losses.BinaryCrossentropy(),
)

optimal_malware_training_ratio, aut, error_rate = spatial.find_optimal_train_ratio(
    model, 
    X_train, 
    y_train, 
    temp_train, 
    proper_train_size=8, # measured in units of `granularity`, taken from X_train, which has 12 months in total
    validation_size=1, # the remaining 4 months, validate 4 times with a validation window of 1 month
    granularity="month", 
    start_tr_rate=0.1, # % of malware in training to start with
    acceptable_errors=1-accuracy,
    end_tr_rate=0.5 # Stop at max 50% malware
)
print(f"Optimal malware training ratio: {optimal_malware_training_ratio}")


[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A
[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A
[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A
[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A
[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A
[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A
[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A
[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A
[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))

[A
[A



TypeError: cannot unpack non-iterable NoneType object

We can no retrain our classifier with [TODO]% of malware in the training data.

In [None]:
# Redo splitting
X_train, X_test, y_train, y_test, temp_train, temp_test = temporal.time_aware_train_test_split(X, y, t, train_size=8, test_size=1, granularity='month')

# Downsample so that there's 25% malware in the training data
train_idxs = spatial.downsample_to_rate(y_train, 0.25) # TODO: change here

X_train = X_train[train_idxs]
y_train = y_train[train_idxs]
temp_train = temp_train[train_idxs]

In [None]:
model = keras.models.clone_model(model_architecture)
model.compile(
    optimizer="adam",
    loss=tf.keras.losses.BinaryCrossentropy(),
)

results_25 = evaluation.fit_predict_update(model, X_train, X_test, y_train, y_test, temp_train, temp_test)

In [None]:
print(f"New robustness over time (measured in AUT): {metrics.aut(results_25, 'f1')}")
metrics.print_metrics(results_25)

### Figure

In [None]:
# Reproduce the plot

pendleblue='#1f8fff'
pendleyellow='#ffa600'

plot(results_10['f1'], marker='x', color=pendleblue)
plot(results_25['f1'], marker='o', color='gray')
legend(['F1 10%', 'F1 25%'])
xlim([0, 23])
plt.xticks(ticks=range(1, 24, 3))
ylim([0, 1])
xlabel('Testing period (month)')
ylabel('F1 Score')
grid(axis = 'y')