Malware Opcode Family Classification 

Authored by David Luong

Reference: https://machinelearningmastery.com/sequence-classification-lstm-recurrent-neural-networks-python-keras/

In [111]:
import os
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt # for plotting model loss

#Import svm model
from sklearn import svm
#Import random forest model
from sklearn.ensemble import RandomForestClassifier

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Bidirectional
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.layers import MaxPooling1D
from tensorflow.keras.layers import Embedding
from tensorflow.keras.preprocessing import sequence

#Import knn model
from sklearn.neighbors import KNeighborsClassifier

# Final evaluation of the model
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.model_selection import KFold
from sklearn.metrics import plot_confusion_matrix, recall_score, precision_score
from sklearn.naive_bayes import GaussianNB

# fix random seed for reproducibility
tf.random.set_seed(7)

import os
import sys
import json

# Assign main directory to a variable
main_dir=os.path.dirname(sys.path[0])

Define Run Settings

In [112]:
# parameters
real_malware_list = ['OnLineGames', 'Renos', 'VBInject', 'WinWebSec', 'Zbot']

use_harshit = 0 # 1 to use Harshit's fake samples, other to use Albert's
select_malware = 'vae_dense_samples' # wgan_gp_samples, vae_cnn_samples, vae_dense_samples
select_vectorization = 'pos' # freq, pos
unique_opcodes = 1 # 0 = visualize_explore/opcodes, 1 = fake_tests/opdicts
max_sequence_length = 600
use_deep_classifiers = 1 # 1+ = use, 0 do not use 

# LSTM
embedding_vector_length = 32

Define Helper Functions

In [113]:
# function to get unique values
def unique(list1):
    # initialize a null list
    unique_list = []
    # traverse for all elements
    for x in list1:
        # check if exists in unique_list or not
        if x not in unique_list:
            unique_list.append(x)
    return unique_list

# Python code to count the number of occurrences
def countX(lst, x):
    count = 0
    for ele in lst:
        if (ele == x):
            count = count + 1
    return count

# opcodes are indexed by overall frequency
def get_opcode_freq(opcode_frequency):
    rank = 0
    prev_count = -1
    for item in sorted(opcode_frequency,reverse=True):
        # increment rank if current opcode has different frequency than previous opcode
        if prev_count != item[0]:
            rank+=1
        # assign frequency rank
        opcode_frequency[item[1]] = rank
        # save previous frequency
        prev_count = item[0]
    return opcode_frequency

# opcodes are indexed by unique opcode position (opcodes_into_list)
def get_opcode_pos(data_into_list,opcodes_into_list):
    opcode_position = []
    nx = 0
    for x in data_into_list:
        ny = 0
        for y in opcodes_into_list:
            if x == y:
                opcode_position.append(ny)
                break
            ny+=1
        nx+=1
        # index for unknown opcodes
        if len(opcodes_into_list) == ny:
            opcode_position.append(ny)
    return opcode_position

# get opcode list from opdict
def get_opcode_dict(malware_fam,file_path):
    my_opcodes = file_path + 'opdict' + malware_fam + '.json'
    with open(my_opcodes) as json_file:
        data = json.load(json_file)
    return list(data.keys())

Load Malware Files

In [114]:
# initialize variables
dataset_real = []
dataset_real_names = []
dataset_real_ind = []
dataset_fake = []
dataset_fake_names = []
dataset_fake_ind = []

max_top_opcodes = -1

for real_malware in real_malware_list:

    if real_malware == 'WinWebSec':
        fake_malware = 'wws'
        top_opcodes = 22
        malware_ind = 0
    elif real_malware == 'OnLineGames':
        fake_malware = 'olgames'
        top_opcodes = 22
        malware_ind = 1
    elif real_malware == 'Renos':
        fake_malware = 'renos'
        top_opcodes = 22
        malware_ind = 2
    elif real_malware == 'VBInject':
        fake_malware = 'vbinject'
        top_opcodes = 25
        malware_ind = 3
    elif real_malware == 'Zbot':
        fake_malware = 'zbot'
        top_opcodes = 20
        malware_ind = 4
    else:
        print('!!!Malware not found!!!')

    # find max top opcodes
    if max_top_opcodes < top_opcodes:
        max_top_opcodes = top_opcodes

    # opening list of unique opcodes
    if unique_opcodes == 0:
        my_fp = './code/visualize_explore/opcodes/'
        my_opcodes = open(my_fp + 'opcodes' + real_malware + '.txt','r')
        opcodes = my_opcodes.read()
        opcodes_into_list = opcodes.replace('\n', ' ').split(" ")
        opcodes_into_list.remove('')
        # close opcodes file
        my_opcodes.close()
    elif unique_opcodes == 1:
        my_fp = main_dir + '/code-20230116T073801Z-001/code/fake_tests/opdicts/'
        opcodes_into_list = get_opcode_dict(real_malware,my_fp)

    # real malware parameters
    my_filepath = "../malware_data/" + real_malware +'/'
    dir_list = os.listdir(my_filepath)

    # process real malware
    for fm in dir_list:
        # if f == 'VirusShare_07c88839c083ddf7ecb11e7bfde38ea8.txt': # debug
        print('Processing real malware' + my_filepath + fm)

        # opening the file in read mode
        my_file = open(my_filepath + fm, "r")

        # reading the file
        data = my_file.read()

        # replacing end of line('/n') with ' ' and
        # splitting the text it further when '.' is seen.
        data_into_list = data.replace('\n', ' ').split(" ")

        # remove '' from opcodes
        data_into_list.remove('')

        opcode_frequency = []
        if data_into_list:
                
            # create (opcode frequency, rank) tuple
            idx = 0
            for x in opcodes_into_list:
                count = countX(data_into_list, x)
                #print('{} has occurred {} times'.format(x,count))
                opcode_frequency.append((count,idx))
                idx+=1
            # print(sorted(opcode_frequency,reverse=True))

            # opcodes are indexed by overall frequency
            opcode_frequency = get_opcode_freq(opcode_frequency)

            # opcodes are indexed by opcodes_into_list position
            opcode_position = get_opcode_pos(data_into_list,opcodes_into_list)

            # add real malware to dataset
            dataset_real.append(opcode_position)
            dataset_real_names.append(fm)
            dataset_real_ind.append(malware_ind) # indicator for real malware family
        else:
            # skip processing if malware file is empty
            print('------------> is empty ... skipping')

        # close file
        my_file.close()

    # create fake malware dataset
    if use_harshit == 1:
        my_fake_filepath = './code/fake_tests/fakeSamples_small/' + fake_malware # Harshit
        dir_list_fake = os.listdir(my_fake_filepath)

        for fake in dir_list_fake:
            print('Processing Harshit\'s fake malware' + my_fake_filepath + '/' + fake)

            # open the file
            my_fake_opcodes = open(my_fake_filepath + '/' + fake, "r")

            # reading the file
            data = my_fake_opcodes.read()

            # replacing end of line('/n') with ' ' and
            # splitting the text it further when '.' is seen.
            data_into_list = data.replace('\n', ' ').split(" ")

            # remove '' from opcodes
            data_into_list.remove('')

            opcode_frequency = []
            if data_into_list:
                    
                # create (opcode frequency, rank) tuple
                idx = 0
                for x in opcodes_into_list:
                    count = countX(data_into_list, x)
                    #print('{} has occurred {} times'.format(x,count))
                    opcode_frequency.append((count,idx))
                    idx+=1
                # print(sorted(opcode_frequency,reverse=True))

                # opcodes are indexed by overall frequency
                opcode_frequency = get_opcode_freq(opcode_frequency)

                # add real malware to dataset
                if select_vectorization == 'freq':
                    dataset_fake.append(opcode_frequency)
                elif select_vectorization == 'pos':
                    opcode_list = []
                    for oc in data_into_list:
                        opcode_list.append(int(oc))
                    dataset_fake.append(opcode_list)

                dataset_fake_names.append(fake)
                dataset_fake_ind.append(1)

            # close opcodes file
            my_fake_opcodes.close()

    else: # self-generated malware
        
        # fake malware parameters
        my_fake_filepath = './code/fake_tests/' + select_malware + '/' + fake_malware + '/'
        dir_list = os.listdir(my_fake_filepath)

        # process fake malware
        for fm in dir_list:
            # if f == 'VirusShare_07c88839c083ddf7ecb11e7bfde38ea8.txt': # debug
            print('Processing fake malware' + my_fake_filepath + fm)

            # opening the file in read mode
            my_fake_file = open(my_fake_filepath + fm, "r")

            # reading the file
            data = my_fake_file.read()

            # replacing end of line('/n') with ' ' and
            # splitting the text it further when '.' is seen.
            data_into_list = data.replace('\n', ' ').split(" ")

            # remove '' from opcodes
            data_into_list.remove('')

            opcode_frequency = []
            if data_into_list:
                    
                # create (opcode frequency, rank) tuple
                idx = 0
                for x in opcodes_into_list:
                    count = countX(data_into_list, x)
                    #print('{} has occurred {} times'.format(x,count))
                    opcode_frequency.append((count,idx))
                    idx+=1
                # print(sorted(opcode_frequency,reverse=True))

                # opcodes are indexed by overall frequency
                opcode_frequency = get_opcode_freq(opcode_frequency)

                # opcodes are indexed by opcodes_into_list position
                opcode_position = get_opcode_pos(data_into_list,opcodes_into_list)

                # add fake malware to dataset
                if select_vectorization == 'freq':
                    dataset_fake.append(opcode_frequency)
                elif select_vectorization == 'pos':
                    dataset_fake.append(opcode_position)

                dataset_fake_names.append(fm)
                dataset_fake_ind.append(malware_ind) # indicator for malware family
            else:
                # skip processing if malware file is empty
                print('------------> is empty ... skipping')

            # close file
            my_fake_file.close()

# convert dataset to numpy arrays
X_real = np.array(dataset_real,dtype=object)
y_real = np.array(dataset_real_ind,dtype='int64')
X_fake = np.array(dataset_fake,dtype=object)
y_fake = np.array(dataset_fake_ind,dtype='int64')

# print
nSamples_real = len(X_real)
print('There are', nSamples_real, 'real malware files')
nSamples_fake = len(X_fake)
print('There are', nSamples_fake, 'fake malware files')

Processing real malware../malware_data/OnLineGames/VirusShare_013a88d3058686a0d649e11e631c01cb.txt
Processing real malware../malware_data/OnLineGames/VirusShare_038b35f52afedfb80d493b2786f8a34d.txt
Processing real malware../malware_data/OnLineGames/VirusShare_0403e161610d40dc1aa4a46d38ee9f97.txt
Processing real malware../malware_data/OnLineGames/VirusShare_04171a371492604e12b664788e91b7b1.txt
Processing real malware../malware_data/OnLineGames/VirusShare_04dbdcb8f9634289f70e49718a508090.txt
------------> is empty ... skipping
Processing real malware../malware_data/OnLineGames/VirusShare_0590ab7140216666b7ffa812d7dbec59.txt
------------> is empty ... skipping
Processing real malware../malware_data/OnLineGames/VirusShare_0613b3bda683c88e3232da4c4605fae8.txt
------------> is empty ... skipping
Processing real malware../malware_data/OnLineGames/VirusShare_0659968d923826d8b9755c81fd2adde5.txt
Processing real malware../malware_data/OnLineGames/VirusShare_07c88839c083ddf7ecb11e7bfde38ea8.txt
P

Define Training and Test Datasets

In [115]:
# train/test split
X_train = X_real
y_train = y_real
X_test = X_fake
y_test = y_fake

# truncate and pad input sequences
X_train = sequence.pad_sequences(X_train, maxlen=max_sequence_length)
X_test = sequence.pad_sequences(X_test, maxlen=max_sequence_length)

Define LSTM Classifier

In [116]:
# create LSTM model

if use_deep_classifiers:
    model = Sequential()
    model.add(Embedding(max_top_opcodes, embedding_vector_length, input_length=max_sequence_length))
    model.add(LSTM(10))
    model.add(Dense(len(real_malware_list), activation='softmax'))
    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    print(model.summary())
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=64)

    # Final evaluation of the model
    scores = model.evaluate(X_test, y_test, verbose=1)
    lstm_score = scores[1]*100
    print("LSTM Accuracy: %.2f%%" % (scores[1]*100))

Model: "sequential_42"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_42 (Embedding)    (None, 600, 32)           800       
                                                                 
 lstm_42 (LSTM)              (None, 10)                1720      
                                                                 
 dense_42 (Dense)            (None, 5)                 55        
                                                                 
Total params: 2,575
Trainable params: 2,575
Non-trainable params: 0
_________________________________________________________________
None
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
LSTM Accuracy: 30.00%


Debug

In [117]:
# DEBUG

#predict_x=model.predict(X_test) 
#classes_x=np.argmax(predict_x,axis=1)

#print(predict_x)
#print(classes_x)
#print(y_test)

Define Bidirectional LSTM Classifier

In [118]:
# create the bidirectional LSTM model

if use_deep_classifiers:
    model = Sequential()
    model.add(Embedding(max_top_opcodes, embedding_vector_length, input_length=max_sequence_length))
    model.add(Bidirectional(LSTM(10, dropout=0.2, recurrent_dropout=0.2)))
    model.add(Dense(len(real_malware_list), activation='softmax'))
    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    print(model.summary())
    model.fit(X_train, y_train, epochs=10, batch_size=64)
    # Final evaluation of the model
    scores = model.evaluate(X_test, y_test, verbose=0)
    bidirec_lstm_score = scores[1]*100
    print("Accuracy: %.2f%%" % (scores[1]*100))

Model: "sequential_43"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_43 (Embedding)    (None, 600, 32)           800       
                                                                 
 bidirectional_4 (Bidirectio  (None, 20)               3440      
 nal)                                                            
                                                                 
 dense_43 (Dense)            (None, 5)                 105       
                                                                 
Total params: 4,345
Trainable params: 4,345
Non-trainable params: 0
_________________________________________________________________
None
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Accuracy: 10.00%


Define CNN LSTM Classifier

In [119]:
# create the LSTM+CNN model

if use_deep_classifiers:
    model = Sequential()
    model.add(Embedding(max_top_opcodes, embedding_vector_length, input_length=max_sequence_length))
    model.add(Conv1D(filters=32, kernel_size=3, padding='same', activation='relu'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(LSTM(10))
    model.add(Dense(len(real_malware_list), activation='softmax'))
    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    print(model.summary())
    model.fit(X_train, y_train, epochs=10, batch_size=64)
    # Final evaluation of the model
    scores = model.evaluate(X_test, y_test, verbose=0)
    cnn_lstm_score = scores[1]*100
    print("Accuracy: %.2f%%" % (scores[1]*100))

Model: "sequential_44"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_44 (Embedding)    (None, 600, 32)           800       
                                                                 
 conv1d_3 (Conv1D)           (None, 600, 32)           3104      
                                                                 
 max_pooling1d_3 (MaxPooling  (None, 300, 32)          0         
 1D)                                                             
                                                                 
 lstm_44 (LSTM)              (None, 10)                1720      
                                                                 
 dense_44 (Dense)            (None, 5)                 55        
                                                                 
Total params: 5,679
Trainable params: 5,679
Non-trainable params: 0
___________________________________________________

Define Random Forest Classifier

In [120]:
#Create a Random Forest Classifier
model = RandomForestClassifier(max_depth=3, random_state=0)

#Train the model using the training sets
model.fit(X_train, y_train)

#Predict the response for test dataset
y_pred = model.predict(X_test)

# Final evaluation of the model
print("Random Forest Accuracy: %.2f%%" % (accuracy_score(y_test, y_pred)*100))
rf_score = accuracy_score(y_test, y_pred)*100

# Classification Report
y_pred = np.round(model.predict(X_test))
print(classification_report(y_test, y_pred, target_names=real_malware_list))

Random Forest Accuracy: 65.00%
              precision    recall  f1-score   support

 OnLineGames       1.00      0.60      0.75        10
       Renos       1.00      1.00      1.00        10
    VBInject       0.00      0.00      0.00         0
   WinWebSec       0.00      0.00      0.00        10
        Zbot       1.00      1.00      1.00        10

    accuracy                           0.65        40
   macro avg       0.60      0.52      0.55        40
weighted avg       0.75      0.65      0.69        40



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


Define k-Nearest Neighbor Classifier

In [122]:
#Create a k-NN Classifier
model = KNeighborsClassifier(n_neighbors=2)

#Train the model using the training sets
model.fit(X_train, y_train)

#Predict the response for test dataset
y_pred = model.predict(X_test)

# Final evaluation of the model
print("k-NN Accuracy: %.2f%%" % (accuracy_score(y_test, y_pred)*100))
knn_score = accuracy_score(y_test, y_pred)*100

k-NN Accuracy: 95.00%


Display All Classification Results

In [123]:
print('Classification on ' + str(len(X_train)+len(X_test)) + ' ' + str(real_malware_list) + ' Malware Samples (' + str(len(X_train)) + ' Real and ' + str(len(X_test)) + ' Fake ' + select_malware + ')')
print('-------------------------------------------------')

print("Random Forest Score:", rf_score)
print("k-Nearest Neighbor Score:", knn_score)

if use_deep_classifiers:
    print("Standard LSTM Score:", lstm_score)
    print("Bidirectional LSTM Score:", bidirec_lstm_score)
    print("CNN LSTM Score:", cnn_lstm_score)

    list_scores = [rf_score, knn_score, lstm_score, bidirec_lstm_score, cnn_lstm_score]
else:
    list_scores = [rf_score, knn_score]
print("=================================================")
print("Min Score:",min(list_scores))
print("Average Score:",sum(list_scores)/len(list_scores))
print("Max Score:",max(list_scores))
print("=================================================")

Classification on 85 ['OnLineGames', 'Renos', 'VBInject', 'WinWebSec', 'Zbot'] Malware Samples (45 Real and 40 Fake vae_dense_samples)
-------------------------------------------------
Random Forest Score: 65.0
k-Nearest Neighbor Score: 95.0
Standard LSTM Score: 30.000001192092896
Bidirectional LSTM Score: 10.000000149011612
CNN LSTM Score: 10.000000149011612
Min Score: 10.000000149011612
Average Score: 42.000000298023224
Max Score: 95.0
