In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
%cd /content/drive/MyDrive/Colab Notebooks/Malicious Macro Detection

/content/drive/MyDrive/Colab Notebooks/Malicious Macro Detection


In [17]:
import numpy as np
import re
import math
import pandas as pd
from PIL import Image
import io
import time
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import confusion_matrix, accuracy_score

In [6]:
mapper = {
    'white' : 1,
    'mal' : 0
}

train_set = pd.read_csv('train_dataset.csv', encoding='utf-16le')
val_set = pd.read_csv('validation_dataset.csv', encoding='utf-16le')
test_set = pd.read_csv('test_dataset.csv', encoding='utf-16le')

train_set['label'] = train_set['label'].map(mapper)
val_set['label'] = val_set['label'].map(mapper)
test_set['label'] = test_set['label'].map(mapper)

In [14]:
class VBAFeatureExtractor:
    def __init__(self):
        pass

    def extract_social_engineering_phrases(self, text):
        phrases = ["Enable Content", "Previous Version"]
        return any(phrase in text for phrase in phrases)

    # VBA Code Parsing for Feature Extraction
    def extract_variable_assignments(self, vba_code):
        assignments = re.findall(r'(\w+)\s*=\s*(".*?")', vba_code)
        return assignments

    def average_assignment_length(self, vba_code):
        assignments = self.extract_variable_assignments(vba_code)
        if assignments:
            lengths = [len(assign[1]) for assign in assignments]
            return sum(lengths) / len(lengths)
        return 0

    def count_variable_type(self, vba_code, var_type):
        pattern = fr'Dim\s+\w+\s+As\s+{var_type}'
        return len(re.findall(pattern, vba_code)) / len(vba_code)

    def count_integer_variables(self, vba_code):
        return self.count_variable_type(vba_code, 'Integer')

    def count_string_variables(self, vba_code):
        return self.count_variable_type(vba_code, 'String')

    def contains_macro_keywords(self, vba_code):
        keywords = ['AutoOpen', 'AutoClose', 'DocumentOpen', 'DocumentClose']
        return any(keyword in vba_code for keyword in keywords)

    def consecutive_math_operations(self, vba_code):
        operations = re.findall(r'[\+\-\*\/]{2,}', vba_code)
        return max([len(op) for op in operations]) if operations else 0

    def casing_ratio(self, vba_code):
      try:
        variables = re.findall(r'Dim\s+(\w+)', vba_code)
        ratios = [(sum(1 for c in var if c.isupper()) / sum(1 for c in var if c.islower())) for var in variables if var.isalpha()]
        return sum(ratios) / len(ratios) if ratios else 0
      except Exception as e:
        return 0

    def shannon_entropy(self, vba_code):
        prob = [float(vba_code.count(c)) / len(vba_code) for c in dict.fromkeys(list(vba_code))]
        entropy = - sum([p * math.log(p) / math.log(2.0) for p in prob])
        return entropy

    # Aggrageting features
    def extract_features(self, vba_code):
        return {
            "avg_assignment_length": self.average_assignment_length(vba_code),
            "integer_var_count": self.count_integer_variables(vba_code),
            "string_var_count": self.count_string_variables(vba_code),
            "macro_keywords": self.contains_macro_keywords(vba_code),
            "consecutive_math_ops": self.consecutive_math_operations(vba_code),
            "casing_ratio": self.casing_ratio(vba_code),
            "shannon_entropy": self.shannon_entropy(vba_code)
        }

    def process_dataset(self, df):
        feature_list = []
        for _, row in df.iterrows():
            features = self.extract_features(row['vba_code'])
            features['label'] = row['label']  # Add the label to the feature set
            feature_list.append(features)

        return pd.DataFrame(feature_list)

In [15]:
extractor = VBAFeatureExtractor()
train_features = extractor.process_dataset(train_set)
val_features = extractor.process_dataset(val_set)
test_features = extractor.process_dataset(test_set)

In [16]:
train_features.head()

Unnamed: 0,avg_assignment_length,integer_var_count,string_var_count,macro_keywords,consecutive_math_ops,casing_ratio,shannon_entropy,label
0,53.0,0.0,0.000935,False,0,0.75,4.882853,1
1,2.0,0.000229,0.000458,False,0,0.196179,4.923563,1
2,13.75,0.0,0.0,False,2,0.0,5.017054,0
3,13.75,0.0,0.0,False,2,0.0,5.017054,0
4,13.75,0.0,0.0,False,2,0.0,5.017054,0


In [25]:
class ModelTrainer:
    def __init__(self, train_set, val_set, test_set):
        self.train_X = train_set.drop(['label'], axis=1)
        self.train_y = train_set['label']
        self.val_X = val_set.drop(['label'],  axis=1)
        self.val_y = val_set['label']
        self.test_X = test_set.drop(['label'],  axis=1)
        self.test_y = test_set['label']
        self.models = {}

    def train_model(self, model, model_name):
        start_time = time.time()
        model.fit(self.train_X, self.train_y)
        train_time = time.time() - start_time
        self.models[model_name] = model
        return train_time

    def evaluate_model(self, model, X, y):
        predictions = model.predict(X)
        tn, fp, fn, tp = confusion_matrix(y, predictions).ravel()
        tpr = tp / (tp + fn) * 100  # True Positive Rate (Sensitivity)
        fpr = fp / (fp + tn) * 100  # False Positive Rate
        return tpr, fpr

    def train_all_models(self):
        model_configs = {
            'KNeighbors': KNeighborsClassifier(n_neighbors=20, weights='distance', algorithm='ball_tree'),
            'DecisionTree': DecisionTreeClassifier(min_samples_split=6, criterion='entropy', max_depth=100),
            'RandomForest': RandomForestClassifier(min_samples_leaf=2, n_estimators=800, max_features='sqrt', max_depth=50),
            'GaussianNB': GaussianNB()
        }

        metrics = {}

        for model_name, model in model_configs.items():
            print(f"Training {model_name}...")
            train_time = self.train_model(model, model_name)
            tpr, fpr = self.evaluate_model(model, self.val_X, self.val_y)
            metrics[model_name] = {
                'TPR': f"{tpr:.4f}%",
                'FPR': f"{fpr:.4f}%",
                'time': time.strftime("%Mm%Ss", time.gmtime(train_time))
            }

        return metrics

In [26]:
trainer = ModelTrainer(train_features, val_features, test_features)
model_metrics = trainer.train_all_models()

for model, metrics in model_metrics.items():
    print(f"{model}: {metrics}")

Training KNeighbors...
Training DecisionTree...
Training RandomForest...
Training GaussianNB...
KNeighbors: {'TPR': '98.9264%', 'FPR': '1.6917%', 'time': '00m00s'}
DecisionTree: {'TPR': '97.1181%', 'FPR': '1.6353%', 'time': '00m00s'}
RandomForest: {'TPR': '98.9264%', 'FPR': '3.3459%', 'time': '00m20s'}
GaussianNB: {'TPR': '70.0697%', 'FPR': '15.3195%', 'time': '00m00s'}
