In [1]:
import os
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import DataLoader, Dataset

In [2]:
data_folder = 'ml_sec_hw_data/data'

In [30]:
def test_binary_loader(bytes, s):
    l = len(bytes)
    print("bytes length: ", l)

    if l == s:
        # Ha a fájl mérete pontosan s
        x = bytes
    elif l < s:
        # Ha a fájl mérete kisebb mint s, hozzáadunk nullás paddingot
        x = np.pad(bytes, (0, s - l), 'constant', constant_values=(0,))
    else:
        # Ha a fájl mérete nagyobb mint s, átlagoljuk a bájtokat
        group_size = np.ceil(l / s) # csoportok számának kiszámítása
        print("group size: ", group_size)
        # Először hozzáadunk szükséges paddingot
        padding_length = s * group_size - l 
        print("padding length: ", padding_length)
        padded_bytes = np.pad(bytes, (0, int(padding_length)), 'constant', constant_values=(0,))
        print("padding bytes: ", padded_bytes)
        # Átlagolás
        reshaped_bytes = padded_bytes.reshape(-1, int(group_size))
        print("reshaped bytes: \n", reshaped_bytes)
        x = np.mean(reshaped_bytes, axis=1)

    return x

In [28]:
x = test_binary_loader([1,2,3,4], 3)
print(x.shape)
print(x)

group size:  2.0
padding length:  2.0
padding bytes:  [1 2 3 4 0 0]
reshaped bytes:  [[1 2]
 [3 4]
 [0 0]]
(3,)
[1.5 3.5 0. ]


In [5]:
tensor_x = torch.tensor(x)
print(tensor_x)

tensor_x = tensor_x.unsqueeze(0)
print(tensor_x.shape)

tensor([1.5000, 3.5000, 0.0000], dtype=torch.float64)
torch.Size([1, 3])


In [6]:
class MalwareDataset(Dataset):
    def __init__(self, data_folder, s=2**14):
        self.data_folder = data_folder
        self.data = []
        self.labels = []
        for label in os.listdir(data_folder): # data/.../train -> malware, benign
            for file in os.listdir(os.path.join(data_folder, label)):
                file_path = os.path.join(data_folder, label, file)
                x = self.preprocess_binary_file(file_path, s)
                self.data.append(x)
                self.labels.append(label)

    def preprocess_binary_file(self, file_path, s):
        # Bináris fájl beolvasása
        with open(file_path, 'rb') as file:
            file_bytes = np.fromfile(file, dtype=np.uint8)

        l = len(file_bytes)

        if l == s:
            # Ha a fájl mérete pontosan s
            x = file_bytes
        elif l < s:
            # Ha a fájl mérete kisebb mint s, hozzáadunk nullás paddingot
            x = np.pad(file_bytes, (0, s - l), 'constant', constant_values=(0,))
        else:
            # Ha a fájl mérete nagyobb mint s, átlagoljuk a bájtokat
            # Először hozzáadunk szükséges paddingot
            padding_length = s * np.ceil(l / s) - l
            padded_bytes = np.pad(file_bytes, (0, int(padding_length)), 'constant', constant_values=(0,))
            # Átlagolás
            x = np.mean(padded_bytes.reshape(-1, int(np.ceil(l / s))), axis=1)

        # Normalizálás [0, 1] tartományba
        x_normalized = x / 255

        return x_normalized

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx]), torch.tensor([1 if self.labels[idx] == 'malware' else 0])
    
def get_loader(data_folder, batch_size=32):
    dataset = MalwareDataset(data_folder)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [7]:
train_loader = get_loader(os.path.join(data_folder, 'train'), 32)
test_loader = get_loader(os.path.join(data_folder, 'test'), 32)

In [8]:
data, label = next(iter(train_loader))
print(data.unsqueeze(1).float().shape)
print(label.float().shape)

torch.Size([32, 1, 16384])
torch.Size([32, 1])


In [9]:
class MalwareDetector(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=10, stride=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=4, stride=2, padding=0, dilation=1, ceil_mode=False),
        )
        self.linear = nn.Linear(in_features=130976, out_features=1, bias=True)
        self.out = nn.Sigmoid()

    def forward(self, x):
        x = self.conv1(x)
        x = x.view(x.size(0), -1) # Flatten
        x = self.linear(x)
        x = self.out(x)
        return x

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = MalwareDetector().to(device)
bce = nn.BCELoss()
opt = torch.optim.Adam(model.parameters(), lr=0.001)
model.to(device)

def train(model, train_loader, opt, loss_fn, epochs=10):
    for epoch in range(epochs):
        for data, labels in train_loader:
            data = data.to(device)
            labels = labels.to(device)

            opt.zero_grad()
            out = model(data.unsqueeze(1).float())
            loss = loss_fn(out, labels.float())
            loss.backward()
            opt.step()

        print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item()}')

train(model, train_loader, opt, bce, epochs=10)

Epoch 1/10, Loss: 0.1271088421344757
Epoch 2/10, Loss: 0.0075139193795621395
Epoch 3/10, Loss: 0.004727795720100403
Epoch 4/10, Loss: 0.005919822491705418
Epoch 5/10, Loss: 0.028740501031279564
Epoch 6/10, Loss: 0.002325571607798338
Epoch 7/10, Loss: 0.0006117887678556144
Epoch 8/10, Loss: 0.0008404992404393852
Epoch 9/10, Loss: 0.0007017732714302838
Epoch 10/10, Loss: 0.0009823349537327886


In [11]:
from sklearn.metrics import roc_auc_score, confusion_matrix

def test_model(model, test_loader):
    model.eval()  # Teszt módba állítjuk a modellt
    predictions, true_labels = [], []
    model.to('cpu')
    with torch.no_grad():  # Gradiensek számításának kikapcsolása
        for data, labels in test_loader:
            
            output = model(data.unsqueeze(1).float())  # Modell alkalmazása
            pred_prob = output.squeeze().numpy()  # Valószínűségek kinyerése
            predictions.extend(pred_prob)
            true_labels.extend(labels.numpy())
    
    # Bináris osztályozási döntések (0.5 küszöbértékkel)
    pred_labels = [1 if p >= 0.5 else 0 for p in predictions]
    
    # Metrikák kiszámítása
    auc_score = roc_auc_score(true_labels, predictions)
    tn, fp, fn, tp = confusion_matrix(true_labels, pred_labels).ravel()
    
    tpr = tp / (tp + fn)  # True Positive Rate
    tnr = tn / (tn + fp)  # True Negative Rate
    fpr = fp / (tn + fp)  # False Positive Rate
    fnr = fn / (tp + fn)  # False Negative Rate
    accuracy = (tp + tn) / (tp + tn + fp + fn)  # Átlagos pontosság
    
    print(f"Accuracy: {accuracy}")
    print(f"TPR: {tpr}")
    print(f"TNR: {tnr}")
    print(f"FPR: {fpr}")
    print(f"FNR: {fnr}")
    print(f"AUC: {auc_score}")
    
    return accuracy, tpr, tnr, fpr, fnr, auc_score


accuracy, tpr, tnr, fpr, fnr, auc = test_model(model, test_loader)

Accuracy: 0.9974358974358974
TPR: 0.9948717948717949
TNR: 1.0
FPR: 0.0
FNR: 0.005128205128205128
AUC: 0.9993951347797502


### Kérdések:
1. Mi a betanított modell átlagos pontossága a teszt adaton ha s = 2
14? : 99.48%
2. Mi a modell TPR, TNR, FPR, FNR, valamint AUC értéke a teszt adaton? Mi állapítható meg ezekből a modell
teljesítményéről? : Magas pontosság, nagyon ritkán téveszt a detektorunk. Felismeri mind a malware-t mind a benign-t. Szinte nincs félreosztályzás


In [36]:
original_bytes = [1,2,3,4,5,6,7,8,9,10]
adv_suffix = [0] * 5 # ha egy csoport csakis suffix-t tartalmaz akkor a tömörített verzióban is azok lesznek, azaz módosítani tudjuk őket.
adv_bytes = original_bytes + adv_suffix
print("adv_bytes: ", adv_bytes)
x = test_binary_loader(adv_bytes, 6)
print("input: ", x)

adv_bytes:  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0]
group size:  3.0
padding length:  3.0
padding bytes:  [ 1  2  3  4  5  6  7  8  9 10  0  0  0  0  0  0  0  0]
reshaped bytes: 
 [[ 1  2  3]
 [ 4  5  6]
 [ 7  8  9]
 [10  0  0]
 [ 0  0  0]
 [ 0  0  0]]
input:  [2.         5.         8.         3.33333333 0.         0.        ]


In [20]:
def determine_modifiable_positions(bytes, s):
    l = len(bytes)
    modifiable_positions = set()

    if l >= s:
        # Számítjuk, hány bájt kerül átlagolásra egy vektor elem létrehozásához
        group_size = int(np.ceil(l / s))
        
        # Meghatározzuk, hány bájt kerül hozzáadásra paddingként
        padding_length = s * group_size - l
        
        # Meghatározzuk az adversarial suffix hosszát
        suffix_length = l - (l // group_size) * group_size
        
        # Ha a suffix hossza nagyobb, mint a szükséges padding hossza, az utolsó csoportba eső bájtok módosíthatók
        if suffix_length > padding_length:
            modifiable_positions.add(s - 1)  # Az utolsó vektor pozíció módosítható
    
    return modifiable_positions

# Tesztesetek a függvény ellenőrzésére
# Példa bináris fájlok és az 's' paraméter
binaries = [
    ([1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + [0]*4, 6),  # 4 bájt hozzáadva
    ([1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + [0]*5, 6)   # 5 bájt hozzáadva
]

# A determine_modifiable_positions függvény tesztelése
for binary, s in binaries:
    modifiable_positions = determine_modifiable_positions(np.array(binary), s)
    print(f"Binary length: {len(binary)}, 's' value: {s}, modifiable positions: {modifiable_positions}")


Binary length: 14, 's' value: 6, modifiable positions: set()
Binary length: 15, 's' value: 6, modifiable positions: set()
