<a href="https://colab.research.google.com/github/stwater20/AIS3-2024-Material/blob/main/AIS3_Lab6_Malware_API_Sequence_Using_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!ls -al

total 159744
drwxr-xr-x 1 root root      4096 Jul 25 08:20 .
drwxr-xr-x 1 root root      4096 Jul 25 08:18 ..
drwxr-xr-x 4 root root      4096 Jul 23 13:21 .config
-rw-r--r-- 1 root root 160032752 Oct 28  2021 data_demo.npz
-rw-r--r-- 1 root root   3525445 Jul 25 08:20 data_demo.zip
drwxr-xr-x 1 root root      4096 Jul 23 13:21 sample_data


In [None]:
!unzip data_demo.zip

Archive:  data_demo.zip
  inflating: data_demo.npz           


In [None]:
import numpy as np

# 讀取 npz 檔案
data = np.load('data_demo.npz')

# 列出檔案中所有的陣列
print(list(data.keys()))

['x_name', 'x_semantic', 'y']


In [None]:
data['x_name']

array([[  1,   4,  59, ...,   0,   0,   0],
       [ 10,  11,  10, ...,   0,   0,   0],
       [  1,   4,  59, ...,   0,   0,   0],
       ...,
       [123, 253, 253, ...,   0,   0,   0],
       [ 15,   3,   3, ...,  27,  27,  27],
       [  1,  10,  11, ...,   9,  76,  52]])

In [None]:
len(data['x_name'][0]) # 每個軟體有 1000 個 API 序列

1000

In [None]:
data['x_semantic']

array([[  4, 101,  21, ...,  14,  14,  14],
       [  0,  25, 255, ...,  14,  14,  14],
       [  4, 101,  21, ...,  14,  14,  14],
       ...,
       [  0,   1, 163, ...,  14,  14,  14],
       [  0,   5,  61, ...,  14,  32,  33],
       [  4, 101,  21, ...,   5,  79,  80]])

In [None]:
len(data['x_semantic'][0])

4000

In [None]:
data['y'],len(data['y'])

(array([[0],
        [0],
        [0],
        ...,
        [1],
        [1],
        [1]]),
 4000)

In [None]:
flat_list = [item for sublist in data['y'].tolist() for item in sublist]

In [None]:
from collections import Counter
Counter(flat_list)

Counter({0: 2000, 1: 2000})

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import torch.nn.functional as F
from torch.nn import Embedding, LSTM, Linear, BCELoss
from sklearn.metrics import f1_score, precision_score, recall_score
import numpy as np
import warnings

warnings.filterwarnings("ignore")

class LSTMNet(torch.nn.Module):
    def __init__(self):
        super(LSTMNet, self).__init__()
        self.embedder = Embedding(num_embeddings=316, embedding_dim=16)
        self.lstm = LSTM(input_size=16, hidden_size=100, bidirectional=True, batch_first=True)
        self.lin1 = Linear(200, 64)
        self.lin2 = Linear(64, 1)

    def forward(self, x):
        x = self.embedder(x)
        x, (h_n, c_n) = self.lstm(x)
        x = torch.cat((h_n[-2,:,:], h_n[-1,:,:]), dim=1)
        x = F.relu(self.lin1(x))
        x = torch.sigmoid(self.lin2(x))
        return x

def test(model, device, test_loader):
    model.eval()
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            preds = output.round()
            all_preds.extend(preds.view(-1).cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    # Calculate metrics
    precision = precision_score(all_targets, all_preds)
    recall = recall_score(all_targets, all_preds)
    f1 = f1_score(all_targets, all_preds)

    return precision, recall, f1


data = np.load('./data_demo.npz')
train_x_name = data['x_name']
train_y = data['y']
test_x_name = data['x_name']
test_y = data['y']

train_xt = torch.from_numpy(train_x_name).long()
test_xt = torch.from_numpy(test_x_name).long()
train_yt = torch.from_numpy(train_y.astype(np.float32))
test_yt = torch.from_numpy(test_y.astype(np.float32))

train_data = TensorDataset(train_xt, train_yt)
test_data = TensorDataset(test_xt, test_yt)

train_loader = DataLoader(dataset=train_data, batch_size=128, shuffle=True, num_workers=1)
test_loader = DataLoader(dataset=test_data, batch_size=128, num_workers=1)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMNet().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training loop
for epoch in range(1, 11):  # 10 epochs
    model.train()
    total_loss = 0
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        if target.dim() == 1:
            target = target.unsqueeze(1)  # target 現在應該是 [batch_size, 1]
        loss = BCELoss()(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f'Epoch {epoch}, Train Loss: {total_loss / len(train_loader)}')



# Test
precision, recall, f1 = test(model, device, test_loader)
print(f'Test Precision: {precision:.4f}, Test Recall: {recall:.4f}, Test F1: {f1:.4f}')


Epoch 1, Train Loss: 0.49372848123311996
Epoch 2, Train Loss: 0.35968773625791073
Epoch 3, Train Loss: 0.35932787135243416
Epoch 4, Train Loss: 0.3494661985896528
Epoch 5, Train Loss: 0.39521539211273193
Epoch 6, Train Loss: 0.32728859363123775
Epoch 7, Train Loss: 0.2686572172679007
Epoch 8, Train Loss: 0.2746171588078141
Epoch 9, Train Loss: 0.23142848117277026
Epoch 10, Train Loss: 0.20562454778701067
Test Precision: 0.9007, Test Recall: 0.9430, Test F1: 0.9213
