<a href="https://colab.research.google.com/github/mostafa-ja/mal_adv3/blob/main/3_adverserial_attacks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from scipy import sparse
import gdown
import numpy as np
import matplotlib.pyplot as plt
import pickle
import torch.nn.functional as F

In [3]:
download_link = 'https://drive.google.com/uc?id=12iud4h19CZst4exbr3U2A9iDxBYvZ5U_'
output_filepath = '/content/'
gdown.download(download_link, output_filepath)

Downloading...
From (original): https://drive.google.com/uc?id=12iud4h19CZst4exbr3U2A9iDxBYvZ5U_
From (redirected): https://drive.google.com/uc?id=12iud4h19CZst4exbr3U2A9iDxBYvZ5U_&confirm=t&uuid=2cb9bb58-3da5-4697-ba66-610e9a01509b
To: /content/X_redefined_sparse_matrix.npz
100%|██████████| 2.31M/2.31M [00:00<00:00, 110MB/s]


'/content/X_redefined_sparse_matrix.npz'

In [4]:
download_link = 'https://drive.google.com/uc?id=1IhrcT3jHqlPrw2KvQ5vJkBgozxcJ1cJm'
output_filepath = '/content/'
gdown.download(download_link, output_filepath)

Downloading...
From: https://drive.google.com/uc?id=1IhrcT3jHqlPrw2KvQ5vJkBgozxcJ1cJm
To: /content/labels.pt
100%|██████████| 517k/517k [00:00<00:00, 87.8MB/s]


'/content/labels.pt'

In [20]:
download_link = 'https://drive.google.com/uc?id=13o5n06UpMDOhtk4u7B_RBSWa3kiiGXFs'
output_filepath = '/content/'
gdown.download(download_link, output_filepath)

Downloading...
From: https://drive.google.com/uc?id=13o5n06UpMDOhtk4u7B_RBSWa3kiiGXFs
To: /content/DNN_params.pth
100%|██████████| 8.17M/8.17M [00:00<00:00, 109MB/s]


'/content/DNN_params.pth'

In [6]:
download_link = 'https://drive.google.com/uc?id=1PxFOLBnQAlX-EOsqkhGCSd1T3ykAD0-4'
output_filepath = '/content/'
gdown.download(download_link, output_filepath)

Downloading...
From: https://drive.google.com/uc?id=1PxFOLBnQAlX-EOsqkhGCSd1T3ykAD0-4
To: /content/vocab.pkl
100%|██████████| 9.18M/9.18M [00:00<00:00, 92.6MB/s]


'/content/vocab.pkl'

In [7]:
# Load the dictionary from the file
with open('vocab.pkl', 'rb') as f:
    vocab = pickle.load(f)

for i, (key, value) in enumerate(vocab.items()):
    print((key, value))
    if i >= 5:
        break

('android/media/mediaplayer->start', 141045)
('android/app/activity->setcontentview', 140900)
('android/os/vibrator->cancel', 141093)
('android.permission.vibrate', 140720)
('android.hardware.touchscreen', 137091)
('android.intent.action.main', 138335)


In [8]:
# Load dataset
X_redefined = sparse.load_npz("X_redefined_sparse_matrix.npz")
labels_tensor = torch.load('labels.pt')

In [9]:
# Split data into train, validation, and test sets with stratified sampling
X_train_val, X_test, labels_train_val, labels_test = train_test_split(X_redefined, labels_tensor, test_size=0.2, stratify=labels_tensor, random_state=42)
X_train, X_val, labels_train, labels_val = train_test_split(X_train_val, labels_train_val, test_size=0.2, stratify=labels_train_val, random_state=42)

# Combine features and labels into datasets
train_dataset = TensorDataset(torch.tensor(X_train.toarray(), dtype=torch.float32), labels_train)
val_dataset = TensorDataset(torch.tensor(X_val.toarray(), dtype=torch.float32), labels_val)
test_dataset = TensorDataset(torch.tensor(X_test.toarray(), dtype=torch.float32), labels_test)


In [10]:
# Define the DataLoader for training, validation, and test sets
batch_size = 256
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [21]:
import torch
import torch.nn as nn

class MalwareDetectionModel(nn.Module):
    def __init__(self, input_size=10000, hidden_1_size=200, hidden_2_size=200, num_labels=2, dropout_prob=0.6):
        super(MalwareDetectionModel, self).__init__()

        self.input_size = input_size
        self.hidden_1_size = hidden_1_size
        self.hidden_2_size = hidden_2_size
        self.num_labels = num_labels
        self.dropout_prob = dropout_prob

        self.fc1 = nn.Linear(input_size, hidden_1_size)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout_prob)
        self.fc2 = nn.Linear(hidden_1_size, hidden_2_size)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout_prob)
        self.fc3 = nn.Linear(hidden_2_size, num_labels)
        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        x = self.log_softmax(x)
        return x


In [22]:
# Create an instance of your model
model = MalwareDetectionModel()

# Load model parameters
model.load_state_dict(torch.load('DNN_params.pth'))

<All keys matched successfully>

In [14]:
X,y = next(iter(test_loader))
print(X.shape)
print(y.shape)

torch.Size([256, 10000])
torch.Size([256, 1])


In [None]:
delta = torch.zeros_like(X, requires_grad=True)
for t in range(25):
    loss = nn.CrossEntropyLoss()(model(X + delta), y.view(-1).long())
    loss.backward()
    gradients = delta.grad.detach().sign() * (X < 0.5)
    delta.data = (delta + 0.02*delta.grad.detach().sign()).clamp(0.,1.)
    print(delta.data[0])
    delta.grad.zero_()

In [17]:
def round_x(x, alpha=0.5):
    """
    rounds x by thresholding it according to alpha which can be a scalar or vector
    :param x:
    :param alpha: threshold parameter
    :return: a double tensor of 0s and 1s.
    """
    return (x >= alpha).float()


def get_x0(x, rounding_threshold=0.5, is_sample=False):
    """
    Helper function to randomly initialize the the inner maximizer algos
    randomize such that the functionality is preserved.
    Functionality is preserved by maintaining the features present in x

    https://github.com/ALFA-group/robust-adv-malware-detection/

    :param x: training sample
    :param is_sample: flag to sample randomly from feasible area or return just x
    :return: randomly sampled feasible version of x
    """
    if is_sample:
        rand_x = round_x(torch.rand(x.size()), alpha=rounding_threshold)
        if x.is_cuda:
            rand_x = rand_x.cuda()
        return (rand_x.byte() | x.byte()).float()
    else:
        return x


def or_float_tensors(x_1, x_2):
    """
    ORs two float tensors by converting them to byte and back
    Note that byte() takes the first 8 bit after the decimal point of the float
    e.g., 0.0 ==> 0
          0.1 ==> 0
          1.1 ==> 1
        255.1 ==> 255
        256.1 ==> 0
    Subsequently the purpose of this function is to map 1s float tensors to 1
    and those of 0s to 0. I.e., it is meant to be used on tensors of 0s and 1s.

    :param x_1: tensor one
    :param x_2: tensor two
    :return: float tensor of 0s and 1s.
    """
    return (x_1.byte() | x_2.byte()).float()


def xor_float_tensors(x_1, x_2):
    """
    XORs two float tensors by converting them to byte and back
    Note that byte() takes the first 8 bit after the decimal point of the float
    e.g., 0.0 ==> 0
          0.1 ==> 0
          1.1 ==> 1
        255.1 ==> 255
        256.1 ==> 0
    Subsequently the purpose of this function is to map 1s float tensors to 1
    and those of 0s to 0. I.e., it is meant to be used on tensors of 0s and 1s.

    :param x_1: tensor one
    :param x_2: tensor two
    :return: float tensor of 0s and 1s.
    """
    return (x_1.byte() ^ x_2.byte()).float()

def get_loss(model, adv_x, label):
    """
    Compute the loss and prediction correctness.

    Parameters:
    - model: torch.nn.Module, a victim model
    - adv_x: torch.FloatTensor, the adversarially perturbed input samples
    - label: torch.LongTensor, ground truth labels

    Returns:
    - loss_no_reduction: torch.FloatTensor, the computed loss without reduction
    - done: torch.BoolTensor, a tensor indicating if the prediction is incorrect
    """
    y_prob = model(adv_x)
    loss_no_reduction = nn.BCELoss(reduction='none')(y_prob, label)
    y_pred = (y_prob >= 0.5).float()  # Threshold at 0.5
    done = (y_pred != label).squeeze()

    return loss_no_reduction, done

In [32]:
epsilon = 0.02
k = 25
alpha = 0.5

model.eval()
# compute natural loss
loss_natural = nn.CrossEntropyLoss(reduction='none')(model(X), y.view(-1).long())

# initialize starting point
x_next = X.clone()
x_next = get_x0(x_next)

# multi-step
for t in range(k):
    # forward pass
    x_var = torch.tensor(x_next, requires_grad=True)
    y_model = model(x_var)
    loss = nn.CrossEntropyLoss()(y_model, y.view(-1).long())

    # compute gradient
    grad_vars = torch.autograd.grad(loss.mean(), x_var)

    # find the next sample
    x_next = x_next + epsilon * torch.sign(grad_vars[0].data)

    # projection
    x_next = torch.clamp(x_next, min=0., max=1.)

# rounding step
x_next = round_x(x_next, alpha=alpha)

# feasible projection
x_next = or_float_tensors(x_next, X)

# compute adversarial loss
loss_adv = nn.CrossEntropyLoss(reduction='none')(model(x_next), y.view(-1).long()).data


print("Natural loss (%.4f) vs Adversarial loss (%.4f), Difference: (%.4f)" %(loss_natural.mean(), loss_adv.mean(), loss_adv.mean() - loss_natural.mean()))

replace_flag = (loss_adv < loss_natural).squeeze()
x_next[replace_flag] = X[replace_flag]

  x_var = torch.tensor(x_next, requires_grad=True)


Natural loss (0.0060) vs Adversarial loss (2221.2881), Difference: (2221.2820)


In [38]:
outputs = model(x_next)
_, predicted = torch.topk(outputs, k=1)

In [40]:
(predicted != y).squeeze()

tensor([True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, Tr

In [43]:
import torch
import torch.nn as nn

def dfgsm_k(x, y, model, k=25, epsilon=0.02, alpha=0.5, is_report_loss_diff=True, is_sample=False):
    """
    FGSM^k with deterministic rounding
    :param y: ground truth labels
    :param x: feature vector
    :param model: neural network model
    :param k: number of steps
    :param epsilon: update value in each direction
    :param alpha: threshold parameter for rounding
    :param is_report_loss_diff: flag to report loss difference
    :param is_sample: flag to sample randomly from the feasible area
    :return: the adversarial version of x according to dfgsm_k (tensor)
    """
    model.eval()

    # Compute natural loss
    criterion = nn.CrossEntropyLoss(reduction='none')
    loss_natural = criterion(model(x), y.view(-1).long())

    # Initialize starting point
    x_next = x.clone()
    x_next = get_x0(x_next, is_sample)

    # Multi-step
    for t in range(k):
        # Forward pass
        x_var = x_next.clone().detach().requires_grad_(True)
        y_model = model(x_var)
        loss = criterion(y_model, y.view(-1).long())

        # Compute gradient
        grad_vars = torch.autograd.grad(loss.mean(), x_var)

        # Find the next sample
        x_next = x_next + epsilon * torch.sign(grad_vars[0].data)

        # Projection
        x_next = torch.clamp(x_next, min=0., max=1.)

    # Rounding step
    x_next = round_x(x_next, alpha=alpha)

    # Feasible projection
    x_next = or_float_tensors(x_next, x)

    # Compute adversarial loss
    loss_adv = criterion(model(x_next), y.view(-1).long()).data

    if is_report_loss_diff:
        print(f"Natural loss: {loss_natural.mean():.4f}, Adversarial loss: {loss_adv.mean():.4f}, Difference: {(loss_adv.mean() - loss_natural.mean()):.4f}")
        outputs = model(x_next)
        _, predicted = torch.topk(outputs, k=1)
        done = (predicted != y).squeeze()
        print(f"rFGSM: attack effectiveness {done.sum().item() / x.size()[0] * 100:.3f}%.")

    # Replace with natural if adversarial loss is higher
    replace_flag = (loss_adv < loss_natural).squeeze()
    x_next[replace_flag] = x[replace_flag]

    return x_next


In [44]:
dfgsm_k(X,y,model)

Natural loss: 0.0060, Adversarial loss: 2221.2881, Difference: 2221.2820
rFGSM: attack effectiveness 100.000%.


tensor([[0., 1., 1.,  ..., 0., 0., 0.],
        [0., 1., 1.,  ..., 0., 0., 0.],
        [0., 1., 1.,  ..., 0., 0., 0.],
        ...,
        [1., 1., 1.,  ..., 0., 0., 0.],
        [0., 1., 1.,  ..., 0., 0., 0.],
        [1., 1., 1.,  ..., 0., 1., 0.]])

In [None]:
class RFGSM():
    """
    FGSM^k with randomized rounding

    Parameters
    ---------
    @param is_attacker, Boolean, if ture means the role is the attacker
    @param oblivion, Boolean, whether know the adversary indicator or not
    @param kappa, attack confidence on adversary indicator
    @param manipulation_x, manipulations
    @param omega, the indices of interdependent apis corresponding to each api
    @param device, 'cpu' or 'cuda'
    """

    def __init__(self, random=False, device=None):
        super(RFGSM, self).__init__()
        self.device = device
        self.random = random

    def _perturb(self, model, x, label=None,
                 steps=10,
                 step_length=0.02,
                 lmda=1.,
                 use_sample=False):
        """
        perturb node feature vectors

        Parameters
        -----------
        @param model, a victim model
        @param x: torch.FloatTensor, feature vectors with shape [batch_size, vocab_dim]
        @param label: torch.LongTensor, ground truth labels
        @param steps: Integer, maximum number of iterations
        @param step_length: Integer, update value in each direction
        @param lmda, float, penalty factor for balancing the importance of adversary detector
        @param use_sample, Boolean, whether use random start point
        """

        adv_x = x.clone()
        model.eval()
        adv_x = get_x0(adv_x, rounding_threshold=0.5, is_sample=use_sample)
        loss_natural = 0.
        for t in range(steps):
            var_adv_x = torch.autograd.Variable(adv_x, requires_grad=True)
            loss, done = get_loss(model, var_adv_x, label)
            print(loss)
            if t == 0:
                loss_natural = loss
            print(torch.autograd.grad(loss.mean(), var_adv_x)[0].shape)
            grad = torch.autograd.grad(loss.mean(), var_adv_x)[0].data

            # filtering un-considered graphs & positions
            grad4insertion = (grad > 0) * grad * (adv_x <= 0.5)
            grad4ins_ = grad4insertion.reshape(x.shape[0], -1)
            print(torch.sign(grad4ins_)[0])
            # find the next sample
            adv_x = torch.clamp(adv_x + step_length * torch.sign(grad4ins_), min=0., max=1.)
        print(adv_x[0])
        # select adv x
        if self.random:
            round_threshold = torch.rand(adv_x.size()).to(self.device)
        else:
            round_threshold = 0.5

        adv_x = (adv_x >= round_threshold).float()
        # feasible projection
        adv_x = or_tensors(adv_x, x)
        # The below line is different from official codes because it is challenging to design a proper score measurement
        loss_adv, _1 = get_loss(model, adv_x, label)
        replace_flag = (loss_adv < loss_natural).squeeze()
        adv_x[replace_flag] = x[replace_flag]
        return adv_x

    def perturb(self, model, x, label=None,
                steps=10,
                step_length=0.02,
                min_lambda_=1e-5,
                max_lambda_=1e5,
                base=10.,
                verbose=False,
                use_sample=False):
        """
        enhance attack
        """
        assert 0 < min_lambda_ <= max_lambda_
        model.eval()
        if hasattr(model, 'is_detector_enabled'):
            self.lmba = min_lambda_
        else:
            self.lmba = max_lambda_
        adv_x = x.detach().clone().to(torch.float)
        while self.lmba <= max_lambda_:
            with torch.no_grad():
                _, done = get_loss(model, adv_x, label)
            if torch.all(done):
                break
            pert_x = self._perturb(model, adv_x[~done], label[~done],
                                   steps,
                                   step_length,
                                   lmda=self.lmba,
                                   use_sample=use_sample
                                   )
            adv_x[~done] = pert_x
            self.lmba *= base
        with torch.no_grad():
            _, done = get_loss(model, adv_x, label)
            if verbose:
                print(f"rFGSM: attack effectiveness {done.sum().item() / x.size()[0] * 100:.3f}%.")
        return adv_x

In [None]:
fgsm = RFGSM()
adv_x = fgsm.perturb(model, X, label=y,steps=10,step_length=0.02,min_lambda_=1e-5,max_lambda_=1e5,base=10.,verbose=True,use_sample=False)

tensor([[2.2557e-06],
        [7.2600e-08],
        [1.7080e-06],
        [7.4513e-05],
        [1.1064e-05],
        [1.4349e-04],
        [9.3200e-06],
        [4.1878e-05],
        [6.0749e-04],
        [3.6286e-04],
        [7.0824e-05],
        [2.7942e-06],
        [3.2759e-05],
        [3.5169e-06],
        [4.1276e-05],
        [8.0765e-02],
        [5.5046e-03],
        [1.2815e-07],
        [1.2229e-07],
        [1.7493e-05],
        [6.3212e-04],
        [4.8597e-05],
        [1.2368e-06],
        [1.5681e-05],
        [7.3820e-09],
        [1.1757e-08],
        [1.3994e-05],
        [3.1428e-06],
        [8.1003e-06],
        [4.3486e-08],
        [5.4574e-07],
        [2.4140e-06],
        [4.1427e-07],
        [3.3714e-06],
        [9.4361e-06],
        [7.7223e-08],
        [2.5965e-13],
        [5.0702e-09],
        [9.6958e-06],
        [1.1246e-07],
        [1.2015e-04],
        [2.8377e-06],
        [8.8125e-06],
        [7.3981e-13],
        [9.6761e-06],
        [3

RuntimeError: Trying to backward through the graph a second time (or directly access saved tensors after they have already been freed). Saved intermediate values of the graph are freed when you call .backward() or autograd.grad(). Specify retain_graph=True if you need to backward through the graph a second time or if you need to access saved tensors after calling backward.

In [None]:
_, done = get_loss(model, adv_x, y)
done.sum()

tensor(1)