# Import library and support function

In [1]:
import torch
from transformers import AutoTokenizer, AutoModel
from tqdm import tqdm 
from torch.utils.data import Dataset
from torch_geometric.loader import DataLoader
import json
import os
from datasets import load_dataset
import torch_geometric.data 
import data
from collections import Counter
import pandas as pd
import numpy as np
import torch.nn as nn
from torch_geometric.data import Data
from sklearn.metrics import (ConfusionMatrixDisplay, roc_auc_score, precision_score, average_precision_score, recall_score,f1_score, accuracy_score,mean_squared_error,mean_absolute_error, roc_curve, auc, classification_report,auc,confusion_matrix,matthews_corrcoef)
from sklearn.model_selection import train_test_split
from torch_geometric.nn import GATConv
import torch.nn.functional as F
from torch_geometric.nn import global_add_pool
import matplotlib.pyplot as plt
import seaborn as sns
import csv
import time

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
with open("X_matrix_final.json",'r') as f:
    SEMANTIC_EMBEDDING = json.load(f)

# Model hyperparameters
EPOCH_LIST = [25, 50, 75, 100, 125, 150, 175, 200, 225, 250, 275, 300, 400, 500]          
BATCH_LIST = [32, 64, 128, 192, 256, 512] 
EMBEDDED_DIMENSION = 768
HIDDEN_DIM = 12
NUM_HEADS = 12
OUTPUT_DIM = 1
# TRAIN_EPOCHS = 100
LEARNING_RATE = 1e-4
# BATCH_SIZE = 128
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Result
RESULT_PATH = "Result/v03_Final_Train_BERT_BRUTEFORCE_EPOCH_BATCH02.csv"
CHART_PATH = "Result/Charts03"
res_df = pd.DataFrame(columns=[
    "Data Type",
    "STT",
    "Train Epochs",
    "Batch Size",
    "Result Type",
    "Loss",
    "Accuracy",
    "Precision",
    "Recall",
    "F1-Score",
    "TNR",
    "Confusion Matrix",
    "Training Time",
    "Testing Time"
])
res_df.to_csv(RESULT_PATH, mode='a', header=not os.path.exists(RESULT_PATH), index=False)

# Loss
# EPOCHS_LOSS = []

In [3]:
# FUNCTION - API SEQUENCE TO GRAPH
def API_seq_to_graph(api_sequence, label):
    api_sequence = [api for api in api_sequence if isinstance(api, str) and api]
    if len(api_sequence) < 2:
        return None

    # Graph edges
    transitions = [(api_sequence[i], api_sequence[i+1]) for i in range(len(api_sequence)-1)]
    edge_counters = Counter(transitions)

    unique_apis = list(set(api_sequence))
    apis_call_time = {api: idx for idx, api in enumerate(unique_apis)}
    number_nodes = len(unique_apis)

    src, dst, edge_weights = [], [], []
    for (u, v), count in edge_counters.items():
        src.append(apis_call_time[u])
        dst.append(apis_call_time[v])
        edge_weights.append(count)

    edge_index = torch.tensor([src, dst], dtype=torch.long)
    edge_attr = torch.tensor(edge_weights, dtype=torch.float).view(-1,1)

    # Node features
    nodes_features = []
    for api in unique_apis:
        embedding = SEMANTIC_EMBEDDING.get(api)
        if embedding is None:
            nodes_features.append(np.zeros(EMBEDDED_DIMENSION))
        else:
            nodes_features.append(embedding)
    nodes_features = np.array(nodes_features)
    X = torch.tensor(nodes_features, dtype=torch.float32)
    y = torch.tensor(label, dtype=torch.float32)

    return Data(x=X, edge_index=edge_index, edge_attr=edge_attr, y=y, num_nodes=number_nodes)


In [4]:
class DawnGNN_GAT(nn.Module):
    def __init__(self):
        super(DawnGNN_GAT, self).__init__()
        self.conv1 = GATConv(EMBEDDED_DIMENSION, HIDDEN_DIM, heads=NUM_HEADS, dropout=0.6)
        self.conv2 = GATConv(HIDDEN_DIM * NUM_HEADS, HIDDEN_DIM, heads=1, concat=False, dropout=0.6)

        self.classifier = nn.Sequential(
            nn.Linear(HIDDEN_DIM, HIDDEN_DIM//2),
            nn.ReLU(),
            nn.Linear(HIDDEN_DIM//2, OUTPUT_DIM)
        )

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = F.elu(self.conv1(x, edge_index))
        x = F.elu(self.conv2(x, edge_index))
        x = global_add_pool(x, batch)
        x = self.classifier(x)
        return torch.sigmoid(x).squeeze(1)

In [5]:
# EVALUATION FUNCTION
def evaluate_model(DATASET, epoch, model, loader, BATCH_SIZE, EPOCHS_LOSS, TRAIN_EPOCHS, device, criterion, val_test, training_time, cumulative_testing_time=0):
    model.eval()
    start_test = time.time()
    total_loss = 0
    y_true, y_pred = [], []

    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            out = model(data)
            loss = criterion(out.float(), data.y.float())
            total_loss += loss.item() * data.num_graphs
            y_pred.extend((out>0.5).long().cpu().tolist())
            y_true.extend(data.y.long().cpu().tolist())

    end_test = time.time()
    testing_time = end_test - start_test
    cumulative_testing_time += testing_time
    precision = precision_score(y_true, y_pred, average='binary', zero_division=0)
    recall = recall_score(y_true, y_pred, average='binary', zero_division=0)
    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='binary', zero_division=0)
    cm = confusion_matrix(y_true, y_pred)
    tn, fp, fn, tp = cm.ravel()
    TNR = tn / (tn + fp)
    FPR = fp / (fp + tn)
    FNR = fn / (fn + tp)
    NPV = tn / (tn + fn)

    if (val_test == 'Test'):
        LABELS = ['Benign', 'Malware']
        plt.figure(figsize=(6,5))
        sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=LABELS, yticklabels=LABELS)
        plt.title('Confusion Matrix', fontsize=17, pad=20)
        plt.ylabel('Actual', fontsize=13)
        plt.xlabel('Prediction', fontsize=13)
        plt.gca().xaxis.set_label_position('top') 
        plt.gca().xaxis.tick_top()
        plt.show()

    res_df.loc[len(res_df)] = [
        DATASET,
        epoch,
        TRAIN_EPOCHS,
        BATCH_SIZE,
        val_test,
        total_loss/len(loader.dataset),
        acc,
        precision,
        recall,
        f1,
        TNR,
        str(cm.tolist()),
        training_time,
        cumulative_testing_time
    ]
    if ((epoch == TRAIN_EPOCHS) and (val_test == 'Test')):
        res_df.to_csv(RESULT_PATH, mode='a', header=False, index=False)

    avg_epoch_loss = total_loss/len(loader.dataset)
    EPOCHS_LOSS.append(avg_epoch_loss)

    return total_loss/len(loader.dataset), acc, f1, precision, recall, TNR, cumulative_testing_time


In [6]:
def main_train_test_gat():
    print("==================== CONVERT API SEQUENCE TO GRAPH ====================")
    # LOAD DATASET
    dataset = ["APIMDS", "MalBehavD-V1-dataset", "WINDOWS_PE_APICALLS"]
    # Load Dataset
    for ds in dataset: 
        DATASET_PATH = f"Dataset/{ds}.csv"
        DATASET = ds    
        # DATASET_PATH = "Dataset/WINDOWS_PE_APICALLS_CSV9.csv"
        # DATASET = "WINDOWS_PE_APICALLS_CSV9"
        df = pd.read_csv(DATASET_PATH, low_memory=False)
        if ds == 'MalBehavD-V1-dataset':
            start_api = 2
            label_col_name = 'labels'
        elif ds == 'APIMDS':
            start_api = 2
            label_col_name = 'labels'
        elif ds == 'WINDOWS_PE_APICALLS':
            start_api = 1
            df['labels'] = df['Malware'].apply(lambda x: 0 if x.lower() in ['benign', 'normal'] else 1)
            label_col_name = 'labels'
            
        # print(df.head(5))
        data_list = []
        for index, row in tqdm(df.iterrows(), total=len(df)):
            api_sequence = row.iloc[2:].dropna().tolist()
            label = float(row['labels'])
            graph = API_seq_to_graph(api_sequence, label)
            if graph is not None:
                data_list.append(graph)
    
        # Split dataset
        train_data, temp_data = train_test_split(data_list, test_size=0.2, random_state=42)
        val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)
    
        # Model & optimizer
        model = DawnGNN_GAT().to(DEVICE)
        criterion = nn.BCELoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
        # Training
        for epoch in EPOCH_LIST:
            for batch in BATCH_LIST:    
                TRAIN_EPOCHS = epoch
                BATCH_SIZE = batch
                EPOCHS_LOSS = []
    
                train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
                val_loader = DataLoader(val_data, batch_size=BATCH_SIZE)
                test_loader = DataLoader(test_data, batch_size=BATCH_SIZE)
                
                print(f"\n===== RUNNING WITH EPOCH={epoch}  |  BATCH={batch} =====")
                cumulative_testing_time = 0
                for epoch in range(1, TRAIN_EPOCHS+1):
                    model.train()
                    start_train = time.time()
                    total_loss = 0
                    for data in train_loader:
                        optimizer.zero_grad()
                        data = data.to(DEVICE)
                        out = model(data)
                        loss = criterion(out.float(), data.y.float())
                        loss.backward()
                        optimizer.step()
                        total_loss += loss.item() * data.num_graphs
            
                    end_train = time.time()
                    training_time = end_train - start_train
                    
                    val_loss, val_acc, val_f1, val_pre, val_recall, val_TNR, cumulative_testing_time = evaluate_model(DATASET, epoch, model, val_loader, BATCH_SIZE, EPOCHS_LOSS, TRAIN_EPOCHS, DEVICE, criterion, val_test="Validation", training_time=training_time,
                        cumulative_testing_time=cumulative_testing_time)
                    print(f"Epoch {epoch} | Train Loss: {total_loss/len(train_data):.4f} | Val Acc: {val_acc:.4f} | Val F1: {val_f1:.4f} | Val Precision: {val_pre:.4f} | Val Recall: {val_recall:.4f} | Val TNR: {val_TNR:.4f}")
    
                # # if __name__ == "__main__":
                # print("==================== TRAINING ====================")
                # model, test_loader, criterion = main_train_gat()
    
                plt.figure(figsize=(8,5))
                plt.plot(range(1, len(EPOCHS_LOSS)+1), EPOCHS_LOSS, marker='o', linestyle='-')
                plt.title("Training Loss per Epoch", fontsize=15)
                plt.xlabel("Epoch", fontsize=13)
                plt.ylabel("Loss", fontsize=13)
                plt.grid(True)
                
                chart_path = f"{CHART_PATH}/{DATASET}_Loss_E{epoch}_B{batch}.png"
                plt.savefig(chart_path, dpi=300)    
                plt.show()
    
                print("==================== TESTING ====================")
                test_loss, test_acc, test_f1, test_precision, test_recall, test_TNR, cumulative_testing_time = evaluate_model(DATASET, epoch, model, test_loader, BATCH_SIZE, EPOCHS_LOSS, TRAIN_EPOCHS, DEVICE, criterion, val_test="Test", training_time=0, cumulative_testing_time=0)
                print("\n=======================================================")
                print(f"✅ FINAL TEST PERFORMANCE (GAT Classifier):")
                print(f"Accuracy: {test_acc:.4f} | F1-Score: {test_f1:.4f} | Loss: {test_loss:.4f} | Recall: {test_recall:.4f} | Precision: {test_precision:.4f} | TNR: {test_TNR:.4f}")
                print("=======================================================")

            
    return model, test_loader, criterion

In [7]:
# if __name__ == "__main__":
print("==================== TRAINING ====================")
model, test_loader, criterion = main_train_test_gat()



  6%|▌         | 1021/17569 [00:08<02:20, 117.44it/s]


KeyboardInterrupt: 

In [None]:
# import matplotlib.pyplot as plt
# plt.figure(figsize=(8,5))
# plt.plot(range(1, len(EPOCHS_LOSS)+1), EPOCHS_LOSS, marker='o', linestyle='-')
# plt.title("Training Loss per Epoch", fontsize=15)
# plt.xlabel("Epoch", fontsize=13)
# plt.ylabel("Loss", fontsize=13)
# plt.grid(True)
# plt.show()

# chart_path = f"{SAVE_FOLDER}/Loss_E{epoch}_B{batch}.png"
# plt.savefig(chart_path, dpi=300)    
# plt.close()

In [None]:
# print("==================== TESTING ====================")
# test_loss, test_acc, test_f1, test_precision, test_recall, test_TNR, cumulative_testing_time = evaluate_model(model, test_loader, DEVICE, criterion, val_test="Test", training_time=0, cumulative_testing_time=0)
# print("\n=======================================================")
# print(f"✅ FINAL TEST PERFORMANCE (GAT Classifier):")
# print(f"Accuracy: {test_acc:.4f} | F1-Score: {test_f1:.4f} | Loss: {test_loss:.4f} | Recall: {test_recall:.4f} | Precision: {test_precision:.4f} | TNR: {test_TNR:.4f}")
# print("=======================================================")