In [1]:
# Import Libraries
import pandas as pd
import warnings
import lightgbm as lgbm
import catboost as catb
import sklearn.svm as svc
import sklearn.neural_network as mlpc
import sklearn.metrics as metrics
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

warnings.filterwarnings("ignore")

df = pd.read_csv('./Dataset/oliveira_labelled.csv')

API_LIST = "./Dataset/api_calls.txt"
DELIMITER = "NaN"
API_FILE = open(API_LIST,"r")
APIS = API_FILE.readline().split(',')
APIS.append(DELIMITER) #serves as a label for NaN values for Instance-based datasets
API_FILE.close()

#Inverse Label Encoding
def inverse_label(item:str):
    global APIS
    return item.map(lambda x: APIS[int(x)])

def list_to_str(ls:list):
    '''Convert list to a stringified version (comma delimited).'''
    output = ""
    for l in ls:
        output += str(l) + ","
    return output[0:len(output)-1]

def inject_patterns(inner_df:pd.DataFrame):
    '''Injects the API call patterns of each sample as its last column'''
    patterns = []
    print("Injecting API patterns...")
    for row in range(inner_df.shape[0]):
        patterns.append(list_to_str(inner_df.iloc[row,1:N_COMPONENTS+1].transpose().to_list()))
    inner_df['pattern'] = patterns
    return inner_df # DBSCAN requires only the numeric label encoded version of the API Calls

def ib_convert(input_df:pd.DataFrame):
    print("Transposing IB...")
    input_df.transpose()
    print("IB Transposed!")
    print("Removing duplicates...")
    print("Row:", end=" ")
    for r in range(input_df.shape[0]):
        row = input_df.iloc[r, 1:N_COMPONENTS+1].drop_duplicates(keep='first', inplace=False).to_list()
        input_df.iloc[r, 1:N_COMPONENTS+1] = row + ([307]*(N_COMPONENTS-len(row)))
        if r % N_COMPONENTS == 0:
            print(r, end=" ")
    print("\nDuplicates removed!")
    print("Retransposing IB (revert)...")
    input_df.transpose()
    print("IB Retransposed!")
    return input_df

# Remove falsely labelled malicious samples
df = df[df['type'] != '_']

# Remove specific malware types
# removables = ['ransomware', 'miner', 'virus', 'spyware', 'hacktool', 'dropper', 'worm']
# for r in removables:
#     df = df[df['type'] != r]

for i in range(N_COMPONENTS,100):
    remove = f"t_{i}"
    df.pop(remove)
display(df.head())

#Remove type column
type_col = df.pop('type')

#Removing hash column
hash_col = df.pop('hash')

#Re-arranging column positions
label_col = df.pop('malware')
df = pd.concat([label_col, df], axis=1)
df = pd.concat([df, hash_col], axis=1) # <=== This will be retained for the benefit of model evaluation.
df = pd.concat([df, type_col], axis=1) # <=== This will be retained for the benefit of model evaluation.

#df.iloc[:, 1:N_COMPONENTS+1] = df.iloc[:, 1:N_COMPONENTS+1].apply(inverse_label, axis=1, result_type='reduce')
#df = inject_patterns(df)

mal_df = df[df['malware'] == 1]
ben_df = df[df['malware'] == 0]

  from pandas.core import (


NameError: name 'N_COMPONENTS' is not defined

In [None]:
X = ben_df.iloc[:,1:] #Features
y = ben_df.iloc[:,0] #Labels
train_features, test_features, train_labels, test_labels = train_test_split(X, y, test_size=0.10, random_state=1, shuffle=True)

ben_train = pd.concat([train_labels,train_features], axis=1)
ben_test = pd.concat([test_labels,test_features], axis=1)

print("Benign for Training:", ben_train['type'].value_counts().sum())
print("Bening for Test: ", ben_test['type'].value_counts().sum())

In [None]:
X = mal_df.iloc[:,1:] #Features
y = mal_df.iloc[:,0] #Labels
train_features, test_features, train_labels, test_labels = train_test_split(X, y, test_size=0.08, random_state=1, shuffle=True)

mal_test = pd.concat([train_labels,train_features], axis=1)
mal_train = pd.concat([test_labels,test_features], axis=1)

print("Malicious for Training:", mal_train['type'].value_counts().sum())
print("Malicious for Testing:", mal_test['type'].value_counts().sum())

## mal_test + ben_test

To not undergo SMOTETonek

In [None]:
test_tb = pd.concat([mal_test, ben_test], axis=0, ignore_index=True)
print(test_tb.shape)
print(test_tb['malware'].value_counts())
display(test_tb.head())
display(test_tb[test_tb['malware']==0])

## mal_train + ben_train

To undergo *SMOTETomek*

In [None]:
train_tb = pd.concat([mal_train, ben_train], axis=0, ignore_index=True)
print(train_tb.shape)
print(train_tb['malware'].value_counts())

In [None]:
from imblearn.combine import SMOTETomek

smt = SMOTETomek(random_state=1, n_jobs=8, sampling_strategy=0.4)

X = train_tb.iloc[:,1:N_COMPONENTS+1]
y = train_tb.iloc[:,0]

X_res, y_res = smt.fit_resample(X, y)
train_tb = pd.concat([y_res,X_res], axis=1)
print(train_tb.shape)
print(train_tb['malware'].value_counts())
display(train_tb.head())
display(train_tb[train_tb['malware']==0])

## Creating IB versions of dataset

In [None]:
train_ib = train_tb.copy(deep=True)
test_ib = test_tb.copy(deep=True)

train_ib = ib_convert(train_ib).copy(deep=True)
print("\n")
test_ib = ib_convert(test_ib).copy(deep=True)

In [None]:
# train_ib.iloc[:,1:N_COMPONENTS+1] = train_ib.iloc[:,1:N_COMPONENTS+1].astype('str')
# train_ib.replace("nan", "NaN", inplace=True)
# test_ib.iloc[:,1:N_COMPONENTS+1] = test_ib.iloc[:,1:N_COMPONENTS+1].astype('str')
# test_ib.replace("nan", "NaN", inplace=True)
# display(train_ib.head())
# display(test_ib.head())

## Converting results to usable for models

Encoded APIs to Unencoded/String APIs

In [None]:
# train_tb_enc = train_tb.copy(deep=True)
# test_tb_enc = test_tb.copy(deep=True)
# train_ib_enc = train_ib.copy(deep=True)
# test_ib_enc = test_ib.copy(deep=True)

# train_tb_enc.iloc[:, 1:N_COMPONENTS+1] = train_tb.iloc[:, 1:N_COMPONENTS+1].apply(inverse_label, axis=1, result_type='reduce')
# test_tb_enc.iloc[:, 1:N_COMPONENTS+1] = test_tb.iloc[:, 1:N_COMPONENTS+1].apply(inverse_label, axis=1, result_type='reduce')
# train_ib_enc.iloc[:, 1:N_COMPONENTS+1] = train_ib.iloc[:, 1:N_COMPONENTS+1].apply(inverse_label, axis=1, result_type='reduce')
# test_ib_enc.iloc[:, 1:N_COMPONENTS+1] = test_ib.iloc[:, 1:N_COMPONENTS+1].apply(inverse_label, axis=1, result_type='reduce')

# display(train_tb_enc.head())
# display(test_tb_enc.head())
# display(train_ib_enc.head())
# display(test_ib_enc.head())

## Trying it out on LightGBM, CatBoost, and SVM

In [None]:
def get_indexes():
    indexes = []
    for i in range(N_COMPONENTS):
        indexes.append(f"t_{i}")
    return indexes

def train_test(train, test, model, model_str:str):
    X = train.iloc[:,1:N_COMPONENTS+1]
    y = train.iloc[:,0]
    X_test = test.iloc[:,1:N_COMPONENTS+1]
    y_test = test.iloc[:,0]
    print(f"Model: {model_str}")
    #MODEL ROBUSTNESS
    model.fit(X,y)
    y_pred = model.predict(X_test)
    #print("\nModel, Fold, Accuracy, Precision, F1-Score, Recall, ROC-AUC")
    #print(f"{model_str}, {'T'}, {metrics.accuracy_score(y_test, y_pred):.4f}, {metrics.average_precision_score(y_test, y_pred):.4f}, {metrics.f1_score(y_test, y_pred):.4f}, {metrics.recall_score(y_test, y_pred):.4f}, {metrics.roc_auc_score(y_test, y_pred):.4f}")
    print(f"Fold: T")
    print(classification_report(y_test, y_pred, digits=4))
    print(f"ROC-AUC: {metrics.roc_auc_score(y_test, y_pred):.4f}")
    #STRATIFIED K-FOLDS
    skf = StratifiedKFold(n_splits=5, random_state=1, shuffle=True)
    ctr = 0
    for train_idx, test_idx in skf.split(train.iloc[:,1:N_COMPONENTS+1], train.iloc[:,0]):
        X_train = train.iloc[train_idx, 1:N_COMPONENTS+1]
        y_train = train.iloc[train_idx, 0]
        model.fit(X_train, y_train)
        y_pred = model.predict(train.iloc[test_idx, 1:N_COMPONENTS+1])
        y_test = train.iloc[test_idx, 0]
        #print(f"{model_str}, {ctr}, {metrics.accuracy_score(y_test, y_pred):.4f}, {metrics.average_precision_score(y_test, y_pred):.4f}, {metrics.f1_score(y_test, y_pred):.4f}, {metrics.recall_score(y_test, y_pred):.4f}, {metrics.roc_auc_score(y_test, y_pred):.4f}")
        print(f"Fold: {ctr}")
        print(classification_report(y_test, y_pred, digits=4))
        ctr += 1
    print('-------------------------------------------------------')
    print("\n")

In [None]:
train_test(train_tb, test_tb, lgbm.LGBMClassifier(random_state=1, n_jobs=0, verbose=0), "LGBM")
train_test(train_tb, test_tb, catb.CatBoostClassifier(random_state=1, thread_count=-1, verbose=0, cat_features=get_indexes(), 
                                                    nan_mode='Min', custom_metric=['Logloss', 'AUC', 'Precision'], one_hot_max_size=256), "CATB")
train_test(train_tb, test_tb, svc.SVC(random_state=1, verbose=0), "SVM")
train_test(train_tb, test_tb, mlpc.MLPClassifier(random_state=1, verbose=0), "MLPC")

In [None]:
train_test(train_ib, test_ib, lgbm.LGBMClassifier(random_state=1, n_jobs=0,verbose=0), "LGBM")
train_test(train_ib, test_ib, catb.CatBoostClassifier(random_state=1, thread_count=-1, verbose=0, cat_features=get_indexes(), 
                                                    nan_mode='Min', custom_metric=['Logloss', 'AUC', 'Precision'], one_hot_max_size=256), "CATB")
train_test(train_ib, test_ib, svc.SVC(random_state=1,verbose=0), "SVM")
train_test(train_ib, test_ib, mlpc.MLPClassifier(random_state=1,verbose=0), "MLPC")