# Steps

- Import required libraries
- Load data
- Exploratory data analysis
- Scale data
- Sampling
- Train base models
- Select Features with SelectKBest
- Hyperparameter Tuning with Optuna
- Model Calibration
- Model Explanation with LIME

# Import Required Libraries

In [None]:
#!pip install -q pefile scikit-learn-intelex

In [None]:
import numpy as np
np.random.seed(42)

import pandas as pd
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.float_format", lambda x: "%.2f" % x)
pd.options.mode.chained_assignment = None

import datetime
from collections import Counter
import os
import pickle
import io
import time
import itertools
import pefile

import seaborn as sns
import matplotlib.pyplot as plt
from mlxtend.plotting import plot_confusion_matrix
sns.set_style("whitegrid")

from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier, ExtraTreesClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression, SGDClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier

import lime
import shap

import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)

from sklearn.base import clone
from sklearn.preprocessing import RobustScaler, OrdinalEncoder, LabelEncoder, label_binarize
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import f1_score, precision_score, recall_score

from sklearn.feature_selection import f_classif
from sklearn.feature_selection import mutual_info_classif
from sklearn.feature_selection import SelectKBest
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN, BorderlineSMOTE

from sklearn.metrics import brier_score_loss
from sklearn.calibration import CalibratedClassifierCV, calibration_curve

if not os.path.exists(f'./outputs'):
    os.makedirs('./outputs')

PROJECT_ROOT_DIR = './outputs'
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, 'images')
if not os.path.exists(f'./outputs/images'):
    os.makedirs('./outputs/images')
    
def save_fig(title):
    path = os.path.join(IMAGES_PATH, title + '.png')
    plt.tight_layout()
    plt.savefig(path, format='png', dpi=300)

from IPython.display import Markdown

def bold(string):
    display(Markdown("**" + string + "**"))

import warnings
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)

In [None]:
#from sklearnex import patch_sklearn
#patch_sklearn()

# Load Data

In [None]:
train_data = pd.read_csv("./data/dataset_malwares.csv")
test_data = pd.read_csv("./data/dataset_test.csv")
train_data.head()

In [None]:
def df_stats(data):
    bold(" SHAPE ".center(50, "#"))
    print("ROWS: {}".format(data.shape[0]))
    print("COLUMNS: {}".format(data.shape[1]))
    bold(" TYPES ".center(50, "#"))
    print(data.dtypes)
    bold(" MISSING VALUES ".center(50, "#"))
    print(data.isnull().sum())
    bold(" DUPLICATED VALUES ".center(50, "#"))
    print("NUMBER OF DUPLICATED VALUES: {}".format(data.duplicated().sum()))
    bold(" MEMORY USAGE ".center(50, "#"))
    buf = io.StringIO()
    data.info(buf=buf)
    info = buf.getvalue().split("\n")[-2].split(":")[1].strip()
    print("Memory Usage: {}".format(info))
    bold(" DESCRIBE ".center(50, "#"))
    display(data.describe().T)

In [None]:
df_stats(train_data)

In [None]:
train_data = train_data.dropna()

# EDA

In [None]:
def grab_cols(df, target):
    cat_cols = [col for col in df.columns if str(df[col].dtypes) in ["bool", "category", "object"] and col != target]
    bold(f"Categorical Variables ({len(cat_cols)})")
    print(cat_cols)
    
    num_cols = [col for col in df.columns if df[col].dtypes in [int, float] and col != target]
    bold(f"Numerical Variables ({len(num_cols)})")
    print(num_cols)
    
    numerical_but_categorical_cols = [col for col in num_cols if df[col].nunique() < 10 and col != target]
    bold(f"Numerical but Categorical Variables ({len(numerical_but_categorical_cols)})")
    print(numerical_but_categorical_cols)
    
    categorical_but_cardinal_cols = [col for col in cat_cols if df[col].nunique() > 20 and col != target]
    bold(f"Categorical but Cardinal Variables ({len(categorical_but_cardinal_cols)})")
    print(categorical_but_cardinal_cols)

    same_value_cols = [col for col in df.columns if df[col].nunique() == 1 and col != target]
    bold(f"Same Value Variables ({len(same_value_cols)})")
    print(same_value_cols)

    for col in same_value_cols:
        if col in cat_cols:
            cat_cols.remove(col)
        elif col in num_cols:
            num_cols.remove(col)
        
    return cat_cols, num_cols

In [None]:
categorical_variables, numerical_variables = grab_cols(train_data, "Malware")

In [None]:
def plot_num(df, columns):
    plt.figure(figsize=(len(columns) / 4, len(columns)))
    for i, column in enumerate(columns):
        plt.subplot(int(len(columns) / 2) + 1, 2, i + 1)
        sns.histplot(x=column, data=df, bins=30, kde=True)
        plt.axvline(df[column].mean(), color="r", linestyle="--", label="Mean")
        plt.axvline(df[column].median(), color="g", linestyle="-", label="Median")
        plt.grid()
        plt.title(f"{column} Distribution")
        plt.legend()
        plt.tight_layout()

    save_fig("plot_numerical_variables")
    plt.show()

In [None]:
plot_num(train_data, numerical_variables)

In [None]:
def plot_correlation_heatmap(df: pd.core.frame.DataFrame, title_name: str='Correlation Map') -> None:
    corr = df.corr(numeric_only=True)
    fig, axes = plt.subplots(figsize=(df.shape[1], df.shape[1]))
    mask = np.zeros_like(corr)
    mask[np.triu_indices_from(mask)] = True
    sns.heatmap(corr, mask=mask, linewidths=.5, cmap='viridis', annot=True, fmt='.2f')
    plt.title(title_name)
    save_fig('correlation')
    plt.show()

In [None]:
plot_correlation_heatmap(train_data, "Dataset Correlation")

# Preprocess

In [None]:
def load_models():
    return [LGBMClassifier(verbose=-100),
            XGBClassifier(),
            RandomForestClassifier(), 
            AdaBoostClassifier(), 
            GradientBoostingClassifier(), 
            DecisionTreeClassifier(), 
            ExtraTreesClassifier(),
            LogisticRegression(), 
            SGDClassifier(),
            CatBoostClassifier(verbose=0), 
            MLPClassifier(),
            GaussianNB(), 
            SVC(), 
            KNeighborsClassifier()]

In [None]:
def preprocess_and_train(df, drop_cols, target):
    temp_df = df.copy()
    temp_df = temp_df.drop(drop_cols, axis=1)
    y = temp_df[target]
    X = temp_df.drop([target], axis=1)
    
    if not os.path.exists(f'./outputs/process'):
        os.makedirs('./outputs/process')
    
    scores_df = pd.DataFrame(columns=["Model Name", "Selected Features", "Parameters", "Train Time", "Test Time", "Train Accuracy", "Test Accuracy", "F1", "Precision", "Recall"])
    samplers = ["No Sampler",
                SMOTE(random_state=42),
                RandomOverSampler(random_state=42),
                ADASYN(random_state=42),
                BorderlineSMOTE(k_neighbors=4, random_state=42)
               ]

    class_index = {}
    for c in np.unique(y):
        class_index[c] = np.where(y == c)[0]

    min_class_len = min(len(ind) for ind in class_index.values())
    test_index = []
    for i in class_index.values():
        test_index.extend(np.random.choice(i, size=min_class_len // 2, replace=False))

    train_index = np.setdiff1d(X.index.values, test_index)
    train_index = list(train_index)

    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y[train_index], y[test_index]

    rs = RobustScaler()
    X_train[numerical_variables] = rs.fit_transform(X_train[numerical_variables])
    X_test[numerical_variables] = rs.transform(X_test[numerical_variables])
    with open('./outputs/process/rs.pkl', 'wb') as f:
        pickle.dump(rs, f)
    
    class_names = temp_df[target].unique().tolist()

    for sampler in samplers:
        if sampler == "No Sampler":
            sampler_name = "No Sampler"
            X_train_resampled, y_train_resampled = X_train, y_train 

        else:
            sampler_name = str(sampler.__class__).split(".")[-1].replace("'>", "")
            X_train_resampled, y_train_resampled = sampler.fit_resample(X_train, y_train)
        
        models = load_models()
        for i, model in enumerate(models):
            train_model(i, temp_df.drop(target, axis=1), model, scores_df, X_train_resampled, X_test, y_train_resampled, y_test, class_names, sampler_name)

    scores_df.to_csv(f'./outputs/scores.csv')
    return scores_df

# Train

In [None]:
def plot_confusion_matrix(predicted, actuals, sub_classes, title='Confusion Matrix', cmap=plt.cm.Blues):
    confusion = confusion_matrix(predicted, actuals)

    plt.imshow(confusion, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(sub_classes))
    plt.xticks(tick_marks, sub_classes, rotation=90)
    plt.yticks(tick_marks, sub_classes)

    for i, j in itertools.product(range(confusion.shape[0]), range(confusion.shape[1])):
        plt.text(j, i, confusion[i, j], horizontalalignment="center", color="black")

    plt.rcParams["figure.figsize"] = (15, 5)
    plt.ylabel('Actuals')
    plt.xlabel('Predicted')

In [None]:
def train_model(i, temp_df, model, scores_df, X_train, X_test, y_train, y_test, class_names, sampler_name):
    if not os.path.exists(f'./outputs/models'):
        os.makedirs(f'./outputs/models')
    
    best_k = 10
    best_score = 0

    for k in range(10, X_train.shape[1] + 1):
        selector = SelectKBest(score_func=f_classif, k=k)
        X_train_selected = selector.fit_transform(X_train, y_train)
        X_test_selected = selector.transform(X_test)

        selected_model = clone(model)

        selected_model.fit(X_train_selected, y_train)
        y_pred = selected_model.predict(X_test_selected)
        accuracy = accuracy_score(y_test, y_pred)

        if accuracy > best_score:
            best_score = accuracy
            best_k = k

    selector = SelectKBest(score_func=f_classif, k=best_k)
    X_train = selector.fit_transform(X_train, y_train)
    X_test = selector.transform(X_test)
    selected = selector.get_support(indices=True)
    selected_features = ','.join(f'"{column_name}"' for column_name in temp_df.columns[selected])

    model_name = str(model.__class__).split(".")[-1].replace("'>", "").replace("Classifier", "")

    study = optuna.create_study(direction='maximize')

    if model_name == "LGBM":
        study.optimize(lambda trial: objective_lgbm(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "AdaBoost":
        study.optimize(lambda trial: objective_ada(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "DecisionTree":
        study.optimize(lambda trial: objective_dt(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "ExtraTree":
        study.optimize(lambda trial: objective_et(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "GradientBoosting":
        study.optimize(lambda trial: objective_gb(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "RandomForest":
        study.optimize(lambda trial: objective_rf(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "SGD":
        study.optimize(lambda trial: objective_sgd(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "XGB":
        study.optimize(lambda trial: objective_xgb(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "SVC":
        study.optimize(lambda trial: objective_svc(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    elif model_name == "KNeighbors":
        study.optimize(lambda trial: objective_knn(trial, X_train, X_test, y_train, y_test), n_trials=10)
        best_params = study.best_params
    else:
        best_params = {}

    model = model.set_params(**best_params)

    best_calibration_method = None
    best_brier = 1
    calibration_methods = ['sigmoid', 'isotonic']
    for calibration in calibration_methods:
        model_calibrate = clone(model)
        calibrated_classifier = CalibratedClassifierCV(model_calibrate, method=calibration, cv=None)
        calibrated_classifier.fit(X_train, y_train)
        if hasattr(calibrated_classifier, 'predict_proba'):
            prob_pos = calibrated_classifier.predict_proba(X_test)
        else:
            prob_pos = calibrated_classifier.decision_function(X_test)
            prob_pos = (prob_pos - prob_pos.min()) / (prob_pos.max() - prob_pos.min())

        y_test_binarized = label_binarize(y_test, classes=np.unique(y_test))
        brier_scores = [brier_score_loss(y_test_binarized[:, i], prob_pos[:, i]) for i in range(y_test_binarized.shape[1])]
        brier = np.mean(brier_scores)
        if brier < best_brier:
            best_calibration_method = calibration

    model_name = f'{model_name} + {sampler_name} + k={best_k} + {best_calibration_method}'
    calibrated_model = CalibratedClassifierCV(model, method=best_calibration_method, cv=None)

    start = time.time()
    calibrated_model.fit(X_train, y_train)
    end = time.time()
    train_time = end - start

    start = time.time()
    y_pred = calibrated_model.predict(X_test)
    end = time.time()
    test_time = end - start

    train_pred = calibrated_model.predict(X_train)
    train_acc = accuracy_score(y_train, train_pred)
    test_acc = accuracy_score(y_test, y_pred)

    f1 = f1_score(y_test, y_pred, average="weighted")
    precision = precision_score(y_test, y_pred, average="weighted")
    recall = precision_score(y_test, y_pred, average="weighted")

    #bold(f" {model_name} ".center(100, "#"))
    
    print(f"Model: {model_name} | Accuracy: {round(test_acc, 4) * 100}%")
    
    #bold("#" * 100)
    
    with open(f'./outputs/models/{model_name}.pkl', 'wb') as file:
        pickle.dump(calibrated_model, file)
    
    scores_df.loc[len(scores_df)] = [model_name, selected_features, best_params, train_time, test_time, train_acc, test_acc, f1, precision, recall]

# Hyperparameters

In [None]:
def objective_lgbm(trial, X_train, X_test, y_train, y_test):
    param = {
        'objective': 'binary',
        'metric': 'binary_logloss',
        'verbosity': -1,
        'boosting_type': 'gbdt',
        'num_leaves': trial.suggest_int('num_leaves', 2, 256),
        'learning_rate': trial.suggest_loguniform('learning_rate', 0.005, 0.5),
        'max_depth': trial.suggest_int('max_depth', 3, 15),
        'min_child_samples': trial.suggest_int('min_child_samples', 1, 100),
        'subsample': trial.suggest_uniform('subsample', 0.6, 1.0),
        'colsample_bytree': trial.suggest_uniform('colsample_bytree', 0.6, 1.0),
        'reg_alpha': trial.suggest_loguniform('reg_alpha', 1e-5, 10.0),
        'reg_lambda': trial.suggest_loguniform('reg_lambda', 1e-5, 10.0),
    }

    model = LGBMClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_xgb(trial, X_train, X_test, y_train, y_test):
    param = {
        'objective': 'binary:logistic',
        'eval_metric': 'error',
        'verbosity': 0,
        'booster': 'gbtree',
        'max_depth': trial.suggest_int('max_depth', 3, 15),
        'learning_rate': trial.suggest_loguniform('learning_rate', 0.005, 0.5),
        'subsample': trial.suggest_uniform('subsample', 0.6, 1.0),
        'colsample_bytree': trial.suggest_uniform('colsample_bytree', 0.6, 1.0),
        'reg_alpha': trial.suggest_loguniform('reg_alpha', 1e-5, 10.0),
        'reg_lambda': trial.suggest_loguniform('reg_lambda', 1e-5, 10.0),
        'min_child_weight': trial.suggest_loguniform('min_child_weight', 1e-5, 10.0),
    }

    model = XGBClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_rf(trial, X_train, X_test, y_train, y_test):
    param = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 200),
        'max_depth': trial.suggest_int('max_depth', 3, 15),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 10),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 5),
        'max_features': trial.suggest_uniform('max_features', 0.6, 1.0)
    }

    model = RandomForestClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_ada(trial, X_train, X_test, y_train, y_test):
    param = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 200),
        'learning_rate': trial.suggest_loguniform('learning_rate', 0.01, 1.0),
    }

    model = AdaBoostClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_gb(trial, X_train, X_test, y_train, y_test):
    param = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 200),
        'learning_rate': trial.suggest_loguniform('learning_rate', 0.01, 1.0),
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 10),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 5),
        'subsample': trial.suggest_uniform('subsample', 0.6, 1.0),
    }

    model = GradientBoostingClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_dt(trial, X_train, X_test, y_train, y_test):
    param = {
        'max_depth': trial.suggest_int('max_depth', 3, 15),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 10),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 5),
        'max_features': trial.suggest_uniform('max_features', 0.6, 1.0)
    }

    model = DecisionTreeClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_et(trial, X_train, X_test, y_train, y_test):
    param = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 200),
        'max_depth': trial.suggest_int('max_depth', 3, 15),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 10),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 5),
        'max_features': trial.suggest_uniform('max_features', 0.6, 1.0)
    }

    model = ExtraTreesClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_sgd(trial, X_train, X_test, y_train, y_test):
    param = {
        'alpha': trial.suggest_loguniform('alpha', 1e-5, 1e-1),
        'loss': trial.suggest_categorical('loss', ['hinge', 'log_loss', 'modified_huber', 'squared_hinge']),
        'penalty': trial.suggest_categorical('penalty', ['l1', 'l2', 'elasticnet'])
    }

    model = SGDClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_svc(trial, X_train, X_test, y_train, y_test):
    param = {
        'kernel': trial.suggest_categorical('kernel', ['rbf']),
        'C': trial.suggest_loguniform('C', 1e-5, 100),
        'gamma': trial.suggest_loguniform('gamma', 1e-5, 100),
    }

    model = SVC(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [None]:
def objective_knn(trial, X_train, X_test, y_train, y_test):
    param = {
        'n_neighbors': trial.suggest_int('n_neighbors', 1, 30),
        'metric': trial.suggest_categorical('metric', ['euclidean', 'manhattan', 'chebyshev']),
        'weights': trial.suggest_categorical('weights', ['uniform', 'distance']),
    }
    model = KNeighborsClassifier(**param)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

# Scores

In [None]:
def plot_results(df, title):
    plt.figure(figsize=(20, 80))

    plt.subplot(711)
    ax = sns.barplot(data=df.sort_values(by="Train Time", ascending=False), y="Model Name", x="Train Time", palette='viridis')
    for container in ax.containers:
        ax.bar_label(container)
    plt.title("Model / Train Time")
    plt.xlabel("")

    plt.subplot(712)
    ax = sns.barplot(data=df.sort_values(by="Test Time", ascending=False), y="Model Name", x="Test Time", palette='viridis')
    for container in ax.containers:
        ax.bar_label(container)
    plt.title("Model / Test Time")
    plt.xlabel("")

    plt.subplot(713)
    ax = sns.barplot(data=df.sort_values(by="Train Accuracy", ascending=True), y="Model Name", x="Train Accuracy", palette='viridis')
    for container in ax.containers:
        ax.bar_label(container)
    plt.title("Model / Train Accuracy")
    plt.xlabel("")

    plt.subplot(714)
    ax = sns.barplot(data=df.sort_values(by="Test Accuracy", ascending=True), y="Model Name", x="Test Accuracy", palette='viridis')
    for container in ax.containers:
        ax.bar_label(container)
    plt.title("Model / Test Accuracy")
    plt.xlabel("")

    plt.subplot(715)
    ax = sns.barplot(data=df.sort_values(by="F1", ascending=True), y="Model Name", x="F1", palette='viridis')
    for container in ax.containers:
        ax.bar_label(container)
    plt.title("Model / F1")
    plt.xlabel("")

    plt.subplot(716)
    ax = sns.barplot(data=df.sort_values(by="Precision", ascending=True), y="Model Name", x="Precision", palette='viridis')
    for container in ax.containers:
        ax.bar_label(container)
    plt.title("Model / Precision")
    plt.xlabel("")

    plt.subplot(717)
    ax = sns.barplot(data=df.sort_values(by="Recall", ascending=True), y="Model Name", x="Recall", palette='viridis')
    for container in ax.containers:
        ax.bar_label(container)
    plt.title("Model / Recall")
    plt.xlabel("")
    save_fig("base_results")
    plt.show()

# Run Functions

In [None]:
drop_list = ['Name', 'e_magic', 'SectionMaxEntropy', 'SectionMaxRawsize', 'SectionMaxVirtualsize', 'SectionMinPhysical', 'SectionMinVirtual', 
             'SectionMinPointerData', 'SectionMainChar', 'SuspiciousImportFunctions', 'SuspiciousNameSection', 'DirectoryEntryImportSize']

In [None]:
target = 'Malware'

In [None]:
for col in drop_list:
    if col in numerical_variables:
        numerical_variables.remove(col)
    elif col in categorical_variables:
        categorical_variables.remove(col)

In [None]:
results_df = preprocess_and_train(train_data, drop_list, target)

# Results

In [None]:
results_df

In [None]:
plot_results(results_df, "Results")

# Model Explanation

In [None]:
best_model = pickle.load(open("./outputs/models/LGBM + RandomOverSampler + k=63 + isotonic.pkl", "rb"))

In [None]:
predict_fn = lambda x: best_model.predict_proba(x)

In [None]:
results_df[results_df["Model Name"] == "LGBM + RandomOverSampler + k=63 + isotonic"]["Selected Features"].values[0]

In [None]:
features = [
"e_cblp","e_cp","e_crlc","e_cparhdr","e_minalloc","e_maxalloc","e_ss","e_sp","e_csum","e_ip","e_cs","e_lfarlc","e_ovno","e_oemid","e_oeminfo","e_lfanew","Machine","NumberOfSections","TimeDateStamp","PointerToSymbolTable","NumberOfSymbols","SizeOfOptionalHeader","Characteristics","Magic","MajorLinkerVersion","MinorLinkerVersion","SizeOfCode","SizeOfInitializedData","SizeOfUninitializedData","AddressOfEntryPoint","BaseOfCode","ImageBase","SectionAlignment","FileAlignment","MajorOperatingSystemVersion","MinorOperatingSystemVersion","MajorImageVersion","MinorImageVersion","MajorSubsystemVersion","MinorSubsystemVersion","SizeOfHeaders","CheckSum","SizeOfImage","Subsystem","DllCharacteristics","SizeOfStackReserve","SizeOfStackCommit","SizeOfHeapReserve","SizeOfHeapCommit","LoaderFlags","NumberOfRvaAndSizes","SectionsLength","SectionMinEntropy","SectionMinRawsize","SectionMinVirtualsize","SectionMaxPointerData","SectionMaxChar","DirectoryEntryImport","DirectoryEntryExport","ImageDirectoryEntryImport","ImageDirectoryEntryResource","ImageDirectoryEntryException","ImageDirectoryEntrySecurity"
]

In [None]:
explainer = lime.lime_tabular.LimeTabularExplainer(
    train_data[features].astype(int).values,
    mode='classification',
    class_names=train_data[target].unique().tolist(),
    training_labels=train_data[target],
    feature_names=features
)

In [None]:
test_data

In [None]:
def explain_instances(df, features, predict_fn, explainer, idx_arr):
    for i in idx_arr:
        bold(f" {df.loc[i, 'Name']} ".center(100, "#"))
        exp = explainer.explain_instance(df.loc[i, features].astype(int).values, predict_fn, num_features=len(features))
        exp.show_in_notebook(show_table=True)
        #exp.as_list()
        bold("#" * 100)

In [None]:
idx_arr = test_data.index.tolist()

In [None]:
explain_instances(test_data, features, predict_fn, explainer, idx_arr)

# Try on Files

In [None]:
def analyze(df):
    for i in range(len(df)):
        file_path = str(df.loc[i, "Name"])
        try:
            pe = pefile.PE(file_path)
        except:
            continue
        df.loc[i, "e_magic"] = pe.DOS_HEADER.e_magic
        df.loc[i, "e_cblp"] = pe.DOS_HEADER.e_cblp
        df.loc[i, "e_cp"] = pe.DOS_HEADER.e_cp
        df.loc[i, "e_crlc"] = pe.DOS_HEADER.e_crlc
        df.loc[i, "e_cparhdr"] = pe.DOS_HEADER.e_cparhdr
        df.loc[i, "e_minalloc"] = pe.DOS_HEADER.e_minalloc
        df.loc[i, "e_maxalloc"] = pe.DOS_HEADER.e_maxalloc
        df.loc[i, "e_ss"] = pe.DOS_HEADER.e_ss
        df.loc[i, "e_sp"] = pe.DOS_HEADER.e_sp
        df.loc[i, "e_csum"] = pe.DOS_HEADER.e_csum
        df.loc[i, "e_ip"] = pe.DOS_HEADER.e_ip
        df.loc[i, "e_cs"] = pe.DOS_HEADER.e_cs
        df.loc[i, "e_lfarlc"] = pe.DOS_HEADER.e_lfarlc
        df.loc[i, "e_ovno"] = pe.DOS_HEADER.e_ovno
        df.loc[i, "e_oemid"] = pe.DOS_HEADER.e_oemid
        df.loc[i, "e_oeminfo"] = pe.DOS_HEADER.e_oeminfo
        df.loc[i, "e_lfanew"] = pe.DOS_HEADER.e_lfanew
        df.loc[i, "Machine"] = pe.FILE_HEADER.Machine
        df.loc[i, "NumberOfSections"] = pe.FILE_HEADER.NumberOfSections
        df.loc[i, "TimeDateStamp"] = pe.FILE_HEADER.TimeDateStamp
        df.loc[i, "PointerToSymbolTable"] = pe.FILE_HEADER.PointerToSymbolTable
        df.loc[i, "NumberOfSymbols"] = pe.FILE_HEADER.NumberOfSymbols
        df.loc[i, "SizeOfOptionalHeader"] = pe.FILE_HEADER.SizeOfOptionalHeader
        df.loc[i, "Characteristics"] = pe.FILE_HEADER.Characteristics
        df.loc[i, "Magic"] = pe.OPTIONAL_HEADER.Magic
        df.loc[i, "MajorLinkerVersion"] = pe.OPTIONAL_HEADER.MajorLinkerVersion
        df.loc[i, "MinorLinkerVersion"] = pe.OPTIONAL_HEADER.MinorLinkerVersion
        df.loc[i, "SizeOfCode"] = pe.OPTIONAL_HEADER.SizeOfCode
        df.loc[i, "SizeOfInitializedData"] = pe.OPTIONAL_HEADER.SizeOfInitializedData
        df.loc[i, "SizeOfUninitializedData"] = pe.OPTIONAL_HEADER.SizeOfUninitializedData
        df.loc[i, "AddressOfEntryPoint"] = pe.OPTIONAL_HEADER.AddressOfEntryPoint
        df.loc[i, "BaseOfCode"] = pe.OPTIONAL_HEADER.BaseOfCode
        df.loc[i, "ImageBase"] = pe.OPTIONAL_HEADER.ImageBase
        df.loc[i, "SectionAlignment"] = pe.OPTIONAL_HEADER.SectionAlignment
        df.loc[i, "FileAlignment"] = pe.OPTIONAL_HEADER.FileAlignment
        df.loc[i, "MajorOperatingSystemVersion"] = pe.OPTIONAL_HEADER.MajorOperatingSystemVersion
        df.loc[i, "MinorOperatingSystemVersion"] = pe.OPTIONAL_HEADER.MinorOperatingSystemVersion
        df.loc[i, "MajorImageVersion"] = pe.OPTIONAL_HEADER.MajorImageVersion
        df.loc[i, "MinorImageVersion"] = pe.OPTIONAL_HEADER.MinorImageVersion
        df.loc[i, "MajorSubsystemVersion"] = pe.OPTIONAL_HEADER.MajorSubsystemVersion
        df.loc[i, "MinorSubsystemVersion"] = pe.OPTIONAL_HEADER.MinorSubsystemVersion
        df.loc[i, "SizeOfHeaders"] = pe.OPTIONAL_HEADER.SizeOfHeaders
        df.loc[i, "CheckSum"] = pe.OPTIONAL_HEADER.CheckSum
        df.loc[i, "SizeOfImage"] = pe.OPTIONAL_HEADER.SizeOfImage
        df.loc[i, "Subsystem"] = pe.OPTIONAL_HEADER.Subsystem
        df.loc[i, "DllCharacteristics"] = pe.OPTIONAL_HEADER.DllCharacteristics
        df.loc[i, "SizeOfStackReserve"] = pe.OPTIONAL_HEADER.SizeOfStackReserve
        df.loc[i, "SizeOfStackCommit"] = pe.OPTIONAL_HEADER.SizeOfStackCommit
        df.loc[i, "SizeOfHeapReserve"] = pe.OPTIONAL_HEADER.SizeOfHeapReserve
        df.loc[i, "SizeOfHeapCommit"] = pe.OPTIONAL_HEADER.SizeOfHeapCommit
        df.loc[i, "LoaderFlags"] = pe.OPTIONAL_HEADER.LoaderFlags
        df.loc[i, "NumberOfRvaAndSizes"] = pe.OPTIONAL_HEADER.NumberOfRvaAndSizes
        df.loc[i, "SectionsLength"] = len(pe.sections)
        
        section_entropy_dict = {}
        for section in pe.sections:
            section_name = section.Name.decode('utf-8').strip('\x00')
            entropy = section.get_entropy()
            section_entropy_dict[section_name] = entropy
            
        df.loc[i, "SectionMinEntropy"] = min(section_entropy_dict.values())
        df.loc[i, "SectionMaxEntropy"] = max(section_entropy_dict.values())
        
        section_raw_size_dict = {}
        for section in pe.sections:
            section_name = section.Name.decode('utf-8').strip('\x00')
            raw_size = section.SizeOfRawData
            section_raw_size_dict[section_name] = raw_size

        df.loc[i, "SectionMinRawsize"] = min(section_raw_size_dict.values())
        df.loc[i, "SectionMaxRawsize"] = max(section_raw_size_dict.values())
        
        section_virt_size_dict = {}
        for section in pe.sections:
            section_name = section.Name.decode('utf-8').strip('\x00')
            virt_size = section.Misc_VirtualSize
            section_virt_size_dict[section_name] = virt_size
            
        df.loc[i, "SectionMinVirtualsize"] = min(section_virt_size_dict.values())
        df.loc[i, "SectionMaxVirtualsize"] = max(section_virt_size_dict.values())
        
        section_physical_addr_dict = {}
        for section in pe.sections:
            section_name = section.Name.decode('utf-8').strip('\x00')
            physical = section.Misc_PhysicalAddress
            section_physical_addr_dict[section_name] = physical
            
        df.loc[i, "SectionMaxPhysical"] = max(section_physical_addr_dict.values())
        df.loc[i, "SectionMinPhysical"] = min(section_physical_addr_dict.values())
        
        section_virt_addr_dict = {}
        for section in pe.sections:
            section_name = section.Name.decode('utf-8').strip('\x00')
            virtual = section.VirtualAddress
            section_virt_addr_dict[section_name] = virtual
    
        df.loc[i, "SectionMaxVirtual"] = max(section_virt_addr_dict.values())
        df.loc[i, "SectionMinVirtual"] = min(section_virt_addr_dict.values())
        
        section_pointer_data_dict = {}
        for section in pe.sections:
            section_name = section.Name.decode('utf-8').strip('\x00')
            pointer_data = section.PointerToRawData
            section_pointer_data_dict[section_name] = pointer_data
            
        df.loc[i, "SectionMaxPointerData"] = max(section_pointer_data_dict.values())
        df.loc[i, "SectionMinPointerData"] = min(section_pointer_data_dict.values())

        section_char_dict = {}
        for section in pe.sections:
            section_name = section.Name.decode('utf-8').strip('\x00')
            chars = section.Characteristics
            section_char_dict[section_name] = chars
            
        df.loc[i, "SectionMaxChar"] = max(section_char_dict.values())
        df.loc[i, "SectionMainChar"] = min(section_char_dict.values())
        
        try:
            df.loc[i, "DirectoryEntryImport"] = len(pe.DIRECTORY_ENTRY_IMPORT)
        except:
            df.loc[i, "DirectoryEntryImport"] = 0
        try:
            df.loc[i, "DirectoryEntryExport"] = len(pe.DIRECTORY_ENTRY_EXPORT.symbols)
        except:
            df.loc[i, "DirectoryEntryExport"] = 0
        
        df.loc[i, "ImageDirectoryEntryExport"] = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']].Size
        df.loc[i, "ImageDirectoryEntryImport"] = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']].Size
        df.loc[i, "ImageDirectoryEntryResource"] = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']].Size
        df.loc[i, "ImageDirectoryEntryException"] = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXCEPTION']].Size
        df.loc[i, "ImageDirectoryEntrySecurity"] = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].Size
    return df

In [None]:
#!wget "https://www.python.org/ftp/python/3.12.0/python-3.12.0-amd64.exe"

In [None]:
rs = pickle.load(open("./outputs/process/rs.pkl", "rb"))

In [None]:
drop_cols = [
    'Name', 'e_magic', 'SectionMaxEntropy', 'SectionMaxRawsize', 
    'SectionMaxVirtualsize', 'SectionMinPhysical', 'SectionMinVirtual', 
    'SectionMinPointerData', 'SectionMainChar'
]

In [None]:
def test_file(file_path, features, drop_cols):
    test_df = pd.DataFrame({"Name": [file_path]})
    result_df = analyze(test_df)
    test_df = result_df.drop(drop_cols, axis=1)
    test = rs.transform(test_df)
    test = pd.DataFrame(test, columns=test_df.columns)
    test = test[features]
    result = best_model.predict_proba(test)
    if np.argmax(result) == 1:
        print("[-] This file is malicious.")
    else:
        print("[+] This file is benign.")

In [None]:
file = "./test/abc.exe"
test_file(file, features, drop_cols)

# Conclusion

In [None]:
results_df.drop(columns=['Selected Features', 'Parameters']).sort_values(by='Test Accuracy', ascending=False).reset_index(drop=True)