In [1]:
#import
import os
import numpy as np
import tensorflow as tf
import sklearn
import glob

import matplotlib.pyplot as plt
from PIL import Image
from skimage.transform import resize

from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.optimizers import Adam, RMSprop

from sklearn.metrics import *

from stego_detector_extended import *

In [2]:
#Utils

#convert to binary
def messageToBinary(message):
  if type(message) == str:
    return ''.join([ format(ord(i), "08b") for i in message ])
  elif type(message) == bytes or type(message) == np.ndarray:
    return [ format(i, "08b") for i in message ]
  elif type(message) == int or type(message) == np.uint8:
    return format(message, "08b")
  else:
    raise TypeError("Input type not supported")
  
#extract k lsb for each channel
def extract_k_lsb_features(data, k=4):
    lsb_training = []
    for img in data:
        binary_data = []
        for values in img:
            for pixel in values:
                r, g, b = messageToBinary(pixel)
                for i in range(1,k+1):
                    binary_data.append(int(r[-1-i+1]))  # extracting data from the least significant bit of red pixel
                    binary_data.append(int(g[-1-i+1]))  # extracting data from the least significant bit of green pixel
                    binary_data.append(int(b[-1-i+1]))  # extracting data from the least significant bit of blue pixel
                # split by 8-bits

        lsb_training.append(np.array(binary_data))
        

    return np.array(lsb_training)

# load images in the image_path
def load_images(image_path):
    images = []
    for f_name in sorted(glob.glob(image_path + '/*.png')):
        img = np.asarray(Image.open(f_name).convert('RGB'))
        images.append(img)
    return images

#convert an image into array
def convert_np_array(vector):
    result = []
    for v in vector:
        result.append(v)
    return np.array(result)

#laod data
def load_data(main_data_folder, usage_folder_name, legit_folder_name, stego_folder_names):
    
    data_path = os.path.join(main_data_folder, usage_folder_name)
    data_to_load = []
    num_stego_images_for_class = []
    data_to_load.append(convert_np_array(load_images(os.path.join(data_path,legit_folder_name))))
    for stego_folder_name in stego_folder_names:
        stego_images = convert_np_array(load_images(os.path.join(data_path,stego_folder_name)))
        num_stego_images_for_class.append(stego_images.shape[0])
        data_to_load.append(stego_images)

    num_legit_images = data_to_load[0].shape[0]
    print("#legit images", data_to_load[0].shape[0])
    print("#stego images", num_stego_images_for_class)

    data_to_load = np.concatenate(data_to_load)
    print("data shape: ", data_to_load.shape)

    print("done")
    
    return data_to_load, num_legit_images, np.array(num_stego_images_for_class)

#create target variable
def create_target(legits, stegos):
    y_legit = np.zeros(legits, dtype=np.int8)
    y_stego = np.ones(stegos, dtype=np.int8)
    return np.concatenate([y_legit, y_stego])

In [3]:
#Parameters ---------------------------

data_folder = "/Users/Massimo-Icar/Desktop/stegomalware/full_dataset/dataset"
legit_folder = 'legit'
stego_folders = ['LSB_stego_php','LSB_stego_url']
cwd = './'
model_space = 'output/models-separate'
seed = 230782
k_lsb = 3 # we use only 2-bits
#------------------------------------

## Stego Malware Detection task

In [4]:
#loading training set
training_set, num_training_legit, num_training_stego_for_class  = load_data(data_folder, "training", legit_folder, stego_folders)
num_training_stego = np.sum(num_training_stego_for_class)
print("#training shape, neg, pos: ", training_set.shape, num_training_legit, num_training_stego)

#lsb extraction
training_set = extract_k_lsb_features(training_set, k_lsb)
y_train = create_target(num_training_legit, num_training_stego)
print("training set ready")

#legit images 29999
#stego images [29999, 119996]
data shape:  (179994, 32, 32, 3)
done
#training shape, neg, pos:  (179994, 32, 32, 3) 29999 149995
training set ready


In [6]:
#loading validation set
validation, num_val_legit, num_val_stego_for_class  = load_data(data_folder, "validation", legit_folder, stego_folders)
num_val_stego = np.sum(num_val_stego_for_class)
print("#validation shape, neg, pos: ", validation.shape, num_val_legit, num_val_stego)

#lsb extraction
validation = extract_k_lsb_features(validation, k_lsb)
y_val = create_target(num_val_legit, num_val_stego)
print("validation set ready")

#legit images 15000
#stego images [15000, 60000]
data shape:  (90000, 32, 32, 3)
done
#validation shape, neg, pos:  (90000, 32, 32, 3) 15000 75000
validation set ready


In [5]:
#init classifier
detector = StegoDetectorExtended(training_set.shape[1])

#compile classifier
detector.compile(loss='binary_crossentropy', optimizer=RMSprop(learning_rate=1e-3), metrics=['accuracy'])

#init callbacks
model_path = os.path.join(cwd, model_space, 'best_model_stego_detection')
print("best model path: ", model_path)
check = ModelCheckpoint(model_path, monitor='val_loss', verbose=0, save_best_only=True, save_weights_only=True, mode='min')

best model path:  ./output/models-separate/best_model_stego_detection


2022-02-22 12:18:24.478453: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [8]:
#training model
detector.fit(training_set, y_train, batch_size=256, epochs=20, validation_data=(validation, y_val), callbacks=[check], verbose=1)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x164c98c90>

In [6]:
#reload best model
detector.load_weights(model_path)

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x15fa19d10>

In [10]:
#loading test set
test, num_test_legit, num_test_stego_for_class  = load_data(data_folder, "test", legit_folder, stego_folders)
num_test_stego = np.sum(num_test_stego_for_class)
print("#test shape, neg, pos: ", test.shape, num_test_legit, num_test_stego)

#lsb extraction
test = extract_k_lsb_features(test, k_lsb)
y_test = create_target(num_test_legit, num_test_stego)
print("test set ready")

#legit images 15001
#stego images [15001, 60004]
data shape:  (90006, 32, 32, 3)
done
#test shape, neg, pos:  (90006, 32, 32, 3) 15001 75005
test set ready


In [11]:
y_pred_score = detector.predict(test)

y_pred_label = np.around(y_pred_score, 0)

report_map = classification_report(y_test, y_pred_label, output_dict=True)

acc_score = accuracy_score(y_test, y_pred_label)

auc_score = roc_auc_score(y_test, y_pred_score)

pr1, rec1, thr1 = precision_recall_curve(y_test, y_pred_score)

auc_score_pr = auc(rec1, pr1)

result = str(acc_score)+";"+str(report_map['macro avg']['precision']) + ";" + str(
        report_map['macro avg']['recall']) + ";" + str(report_map['macro avg']['f1-score']) + ";" + str(
        auc_score) + ";" + str(auc_score_pr)

print("acc;prec;rec;f1;auc;auc-pr")
print(result)

acc;prec;rec;f1;auc;auc-pr
1.0;1.0;1.0;1.0;1.0;1.0


In [7]:
#loading test set unseen
test_unseen, num_test_unseen_legit, num_test_unseen_stego_for_class  = load_data(data_folder, "test_unseen", legit_folder, stego_folders)
num_test_unseen_stego = np.sum(num_test_unseen_stego_for_class)
print("#test unseen shape, neg, pos: ", test_unseen.shape, num_test_unseen_legit, num_test_unseen_stego)

#lsb extraction
test_unseen = extract_k_lsb_features(test_unseen, k_lsb)
y_test_unseen = create_target(num_test_unseen_legit, num_test_unseen_stego)
print("test set unseen ready")

#legit images 15001
#stego images [15001, 60004]
data shape:  (90006, 32, 32, 3)
done
#test unseen shape, neg, pos:  (90006, 32, 32, 3) 15001 75005
test set unseen ready


In [8]:
y_pred_score = detector.predict(test_unseen)

y_pred_label = np.around(y_pred_score, 0)

report_map = classification_report(y_test_unseen, y_pred_label, output_dict=True)

acc_score = accuracy_score(y_test_unseen, y_pred_label)

auc_score = roc_auc_score(y_test_unseen, y_pred_score)

pr1, rec1, thr1 = precision_recall_curve(y_test_unseen, y_pred_score)

auc_score_pr = auc(rec1, pr1)

result = str(acc_score)+";"+str(report_map['macro avg']['precision']) + ";" + str(
        report_map['macro avg']['recall']) + ";" + str(report_map['macro avg']['f1-score']) + ";" + str(
        auc_score) + ";" + str(auc_score_pr)

print("acc;prec;rec;f1;auc;auc-pr")
print(result)

acc;prec;rec;f1;auc;auc-pr
0.9991222807368397;0.9973806366047746;0.9994733684421039;0.9984234242382659;0.9999999893347554;0.9999999978668397


In [9]:
#plot confusion matrix
cm = confusion_matrix(y_test_unseen, y_pred_label)
print(cm)


[[15001     0]
 [   79 74926]]


In [13]:
#time to predict
detector.predict(test_unseen, batch_size=1, verbose=1)



array([[1.7922089e-14],
       [7.5051402e-20],
       [3.8525699e-21],
       ...,
       [1.0000000e+00],
       [1.0000000e+00],
       [1.0000000e+00]], dtype=float32)