In [None]:
! pip install pycryptodome

Collecting pycryptodome
  Downloading pycryptodome-3.20.0-cp35-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.1/2.1 MB 31.9 MB/s eta 0:00:00
[?25hInstalling collected packages: pycryptodome
Successfully installed pycryptodome-3.20.0


In [None]:
from Crypto.Util.number import getPrime, GCD, inverse
import random

def get_random_element_in_Z_n2_star(n):
    while True:
        g = random.randint(1, n**2 - 1)
        if GCD(g, n**2) == 1:
            return g


def generate_keypair(bit_length):
    # Generate two large random primes
    p_prime = getPrime(bit_length)
    q_prime = getPrime(bit_length)
    p = 2 * p_prime + 1
    q = 2 * q_prime + 1
    n = p * q
    nsquare = n * n
    h = 1 + n
    g_prime = get_random_element_in_Z_n2_star(n)
    g = pow(g_prime, 2 * n, nsquare)
    sk = random.randint(1, nsquare // 4)
    pk = pow(g, sk, nsquare)
    return pk, sk, n, g

def encrypt(n, g, nsquare, pk, plaintext, zk):
    r = random.randint(1, n // 4)
    h = 1 + n
    ciphertext = (pow(g, r, nsquare), (pow(pk, r, nsquare) * pow(h, x, nsquare)) % nsquare)
    if zk:
      return ciphertext, r
    return ciphertext

def decrypt(n, nsquare, sk, ciphertext):
    u, v = ciphertext[0], ciphertext[1]
    t = (v * pow(u, -sk, nsquare)) % nsquare
    print(t)
    plaintext = ((t - 1) // n) % nsquare
    return plaintext


# Example usage:
k = 8  # Bit length of the primes, for a real-world application this should be at least 2048.
pk, sk, n, g = generate_keypair(k)
nsquare = n * n

print("Public key:", pk)
print("Private key:", sk)

# Encrypt a number
x = 0  # The plaintext to be encrypted.
ciphertext, r = encrypt(n, g, nsquare, pk, x, True)
plaintext = decrypt(n, nsquare, sk, ciphertext)

print("Ciphertext:", ciphertext)
print("Plaintext:", plaintext)

Public key: 35867081251
Private key: 5773612795
1
Ciphertext: (17190113851, 1617279301)
Plaintext: 0


In [None]:
import pickle
from hashlib import sha256
from sympy.ntheory import totient
def prove(pk, ciphertext, nsquare):
  u, v = ciphertext[0], ciphertext[1]
  h = 1 + n

  t1 = random.randint(1, nsquare)
  t2 = random.randint(1, nsquare)

  T = (pow(pk, t1, nsquare) * pow(h, t2, nsquare)) % nsquare
  T_bytes = T.to_bytes((nsquare.bit_length() + 7) // 8, byteorder='big')
  c = int.from_bytes(sha256(T_bytes).digest(), byteorder='big') % nsquare
  s1 = (t1 + c * r) % int(totient(nsquare))
  s2 = (t2 + c * plaintext) % int(totient(nsquare))
  L = (T * pow(v, c, nsquare)) % nsquare
  R = (pow(pk, s1, nsquare) * pow(h, s2, nsquare)) % nsquare
  return L == R

print(prove(pk, ciphertext, nsquare))

True


In [None]:
import tensorflow as tf

# Load MNIST dataset
mnist = tf.keras.datasets.mnist
(train_images, train_labels), _ = mnist.load_data()

# Generate a keypair
bit_length = 128  # Suitable bit length for security
pk, sk, n, g = generate_keypair(bit_length)
nsquare = n * n
# Encrypt the images
encrypted_images = []
i = 0

proves = []
for image in train_images:
  print(i)
  i += 1
  encrypted_image = []
  for row in image:
    encrypted_row = []
    for pixel in row:
      ciphertext, r = encrypt(n, g, nsquare, pk, pixel, True)
      encrypted_row.append(ciphertext)

    encrypted_image.append(encrypted_row)
  encrypted_images.append(encrypted_image)
print(encrypted_image)
print(False in proves)




In [None]:
! pip install torch torchvision pytorch-lightning

Collecting pytorch-lightning
  Downloading pytorch_lightning-2.2.4-py3-none-any.whl (802 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m802.2/802.2 kB[0m [31m14.8 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvid

In [None]:
import torch
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import pytorch_lightning as pl

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

In [None]:
import torch.nn as nn

class PlanarFlow(nn.Module):
    def __init__(self, dim):
        super(PlanarFlow, self).__init__()
        self.u = nn.Parameter(torch.randn(1, dim))
        self.w = nn.Parameter(torch.randn(dim, 1))
        self.b = nn.Parameter(torch.randn(1))


    def forward(self, x):
        lin = torch.mm(x, self.w) + self.b
        activation = torch.tanh(lin)
        psi = 1 - torch.tanh(lin) ** 2


        u_dot_psi = torch.mm(psi, self.u)


        x = x + u_dot_psi


        w_dot_u = torch.mm(self.w.t(), self.u.t())
        det_jacobian_component = 1 + w_dot_u * psi
        log_det_J = torch.log(torch.abs(det_jacobian_component)).sum()

        return x, log_det_J

class NormalizingFlowModel(nn.Module):
    def __init__(self, dim, flow_length=10):
        super(NormalizingFlowModel, self).__init__()
        self.dim = dim
        self.flows = nn.ModuleList([PlanarFlow(dim) for _ in range(flow_length)])

    def forward(self, x):
        log_det_J = 0
        for flow in self.flows:
            x, flow_log_det_J = flow(x)
            log_det_J += flow_log_det_J.sum(0)
        return x, log_det_J

In [None]:
class FlowTrainer(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.model = NormalizingFlowModel(dim=28*28)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, _ = batch
        z, log_det_J = self(x)
        loss = self.loss_function(z, log_det_J)
        self.log('train_loss', loss)
        return loss

    def loss_function(self, z, log_det_J):

        log_prob_z = -0.5 * torch.sum(z**2, dim=1)
        log_prob_z -= 0.5 * z.size(1) * torch.log(torch.tensor(2 * torch.pi))
        log_prob_x = log_prob_z + log_det_J
        return -torch.mean(log_prob_x)


    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=0.01)
        return optimizer

# Training
trainer = pl.Trainer(max_epochs=5)
model = FlowTrainer()
trainer.fit(model, train_loader)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.utilities.rank_zero:You are using a CUDA device ('NVIDIA A100-SXM4-40GB') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name  | Type                 | Params
-----------------------------------------------
0 | model | NormalizingFlowModel | 15.7 K
--------

Training: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.


In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset

dataset = datasets.MNIST(root='./data', download=True, train=True, transform=transform)
# Create two subsets, digits '0' and '1'
subset_indices_0 = [i for i, (img, label) in enumerate(dataset) if label == 0]
subset_indices_1 = [i for i, (img, label) in enumerate(dataset) if label == 1]

subset_0 = Subset(dataset, subset_indices_0)
subset_1 = Subset(dataset, subset_indices_1)

loader_0 = DataLoader(subset_0, batch_size=64, shuffle=False)
loader_1 = DataLoader(subset_1, batch_size=64, shuffle=False)

In [None]:
def get_latent_variables(model, loader):
    model.eval()
    latents = []
    with torch.no_grad():
        for data, _ in loader:
            data = data.view(data.size(0), -1)
            z, _ = model(data)
            latents.append(z)
    return torch.cat(latents, dim=0)

In [None]:
z_0 = get_latent_variables(model, loader_0)
z_1 = get_latent_variables(model, loader_1)

In [None]:
def kl_divergence(p, q):
    return (p * (p / q).log()).sum()

def calculate_density(hist, bin_edges):
    bin_widths = bin_edges[1:] - bin_edges[:-1]
    density = hist / (bin_widths * hist.sum())
    return density


hist_0, edges_0 = torch.histogram(z_0, bins=100, range=(-5, 5))
hist_1, edges_1 = torch.histogram(z_1, bins=100, range=(-5, 5))


density_0 = calculate_density(hist_0, edges_0)
density_1 = calculate_density(hist_1, edges_1)


density_0 += 1e-10
density_1 += 1e-10

kl_01 = kl_divergence(density_0, density_1)
kl_10 = kl_divergence(density_1, density_0)

print("KL divergence from 0 to 1:", kl_01.item())
print("KL divergence from 1 to 0:", kl_10.item())

KL divergence from 0 to 1: 1.058313250541687
KL divergence from 1 to 0: 0.8886064291000366


In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np
import random


transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
full_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)

In [None]:
indices = list(range(len(full_dataset)))
random.shuffle(indices)

subsets_indices = np.array_split(indices, 10)

subsets = [Subset(full_dataset, subset_indices) for subset_indices in subsets_indices]


In [None]:
poisoned_subset_index = 0
poisoned_subset_indices = subsets_indices[poisoned_subset_index]
num_poisoned = len(poisoned_subset_indices) // 10
poisoned_indices = random.sample(list(poisoned_subset_indices), num_poisoned)
# Create an empty image
empty_image = torch.zeros_like(full_dataset[0][0])

# Replace the selected images with empty images
for idx in poisoned_indices:
    full_dataset.data[idx] = empty_image.squeeze()  # Removing the channel dimension if necessary
    full_dataset.targets[idx] = -1  # You might assign a special label to indicate poisoning

# num_pixels = 28 * 28
# num_to_zero_out = int((1 / 100.0) * num_pixels)
# for idx in poisoned_indices:

#   random_indices = random.sample(range(num_pixels), num_to_zero_out)
#   for i in random_indices:
#     full_dataset.data[idx][i // 28][i % 28] = -1  # After normalizing (-1 / 2 + 0.5 = 0)


torch.Size([60000, 28, 28])


In [None]:
import torch
from scipy.stats import entropy

def calculate_kl_divergence(p, q):

    p += 1e-12
    q += 1e-12
    kl_div = p * torch.log(p / q)
    return kl_div.sum()

kl_divergences = []

# Iterate over each subset, treating it once as the test set and the others as the training set
for i in range(10):
    other_loaders = [subsets[j] for j in range(10) if j != i]
    combined_loader = torch.utils.data.DataLoader(
        torch.utils.data.ConcatDataset(other_loaders),
        batch_size=64, shuffle=True)

    z_combined = get_latent_variables(model, combined_loader)

    z_excluded = get_latent_variables(model, subsets[i])

    hist_combined, _ = torch.histogram(z_combined, bins=100, range=(-5, 5), density=True)
    hist_excluded, _ = torch.histogram(z_excluded, bins=100, range=(-5, 5), density=True)

    kl_div = calculate_kl_divergence(hist_excluded, hist_combined)
    kl_divergences.append(kl_div.item())

    print(f"KL divergence for subset {i} as test set: {kl_div.item()}")

print("Average KL divergence:", sum(kl_divergences) / len(kl_divergences))

KL divergence for subset 0 as test set: 0.011125698685646057


KeyboardInterrupt: 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(32 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))  # Normalize images
])

train_set = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_set = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)


train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
test_loader = DataLoader(test_set, batch_size=64, shuffle=False)

In [None]:
indices = list(range(len(full_dataset)))
random.shuffle(indices)

subsets_indices = np.array_split(indices, 10)

subsets = [DataLoader(Subset(full_dataset, subset_indices), batch_size=64, shuffle=True)
           for subset_indices in np.array_split(np.arange(len(full_dataset)), 10)]


In [None]:
poisoned_subset_index = 5
poisoned_subset_indices = subsets_indices[poisoned_subset_index]

num_poisoned = len(poisoned_subset_indices) // 10
poisoned_indices = random.sample(list(poisoned_subset_indices), num_poisoned)


empty_image = torch.zeros_like(full_dataset[0][0])

for idx in poisoned_indices:
    full_dataset.data[idx] = empty_image.squeeze()
#     full_dataset.targets[idx] = -1

# num_pixels = 28 * 28
# num_to_zero_out = int((5 / 100.0) * num_pixels)
# for idx in poisoned_indices:

#   random_indices = random.sample(range(num_pixels), num_to_zero_out)
#   for i in random_indices:
#     full_dataset.data[idx][i // 28][i % 28] = -1  # After normalizing (-1 / 2 + 0.5 = 0)

In [None]:
def train_model(model, train_loader, criterion, optimizer, num_epochs=5):
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}")

model = SimpleCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)


train_model(model, subsets[0], criterion, optimizer)

Epoch 1, Loss: 1.6822956077595974
Epoch 2, Loss: 0.35062007035346743
Epoch 3, Loss: 0.1909089897303505
Epoch 4, Loss: 0.13746958691626787
Epoch 5, Loss: 0.11706441052337276


In [None]:
import copy

base_model = copy.deepcopy(model)

def evaluate_loss(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for images, labels in data_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
    return total_loss / len(data_loader)


initial_losses = []
for subset_loader in subsets:
    initial_loss = evaluate_loss(base_model, subset_loader, criterion)
    initial_losses.append(initial_loss)



# Gradually include each subset and record the loss
new_losses = []
for i, subset_loader in enumerate(subsets):

    combined_set = torch.utils.data.ConcatDataset([train_set] + [subsets[j].dataset for j in range(i + 1)])
    combined_loader = DataLoader(combined_set, batch_size=64, shuffle=True)

    current_model = copy.deepcopy(base_model)
    train_model(current_model, combined_loader, criterion, optimizer, num_epochs=1)

    new_loss = evaluate_loss(current_model, combined_loader, criterion)
    new_losses.append(new_loss)

    print(f"After including subset {i+1}, New Loss: {new_loss}, Initial Loss: {initial_losses[i]}")

Epoch 1, Loss: 0.12646151373484135
After including subset 1, New Loss: 0.12637054548118726, Initial Loss: 0.10684828166948988
Epoch 1, Loss: 0.1296811612289813
After including subset 2, New Loss: 0.12968116053773296, Initial Loss: 0.16591256914382918
Epoch 1, Loss: 0.13287777266479645
After including subset 3, New Loss: 0.13284626471717598, Initial Loss: 0.17096337616602156
Epoch 1, Loss: 0.13383576578779113
After including subset 4, New Loss: 0.1338275864916114, Initial Loss: 0.14730582840027326
Epoch 1, Loss: 0.13463188394872364
After including subset 5, New Loss: 0.1345954101622719, Initial Loss: 0.14500193640668976
Epoch 1, Loss: 0.135908449449266
After including subset 6, New Loss: 0.1359084493946284, Initial Loss: 0.15531405917507538
Epoch 1, Loss: 0.13762665951421188
After including subset 7, New Loss: 0.1376156626021492, Initial Loss: 0.16503535809510567
Epoch 1, Loss: 0.13959602592899178
After including subset 8, New Loss: 0.13954103611667876, Initial Loss: 0.17260332338194898

In [None]:
%%capture --no-stderr --no-display
# NBVAL_IGNORE_OUTPUT

try:
  import secml
except ImportError:
  %pip install git+https://github.com/pralab/secml

In [None]:
# NBVAL_IGNORE_OUTPUT

from secml.data.loader import CDataLoaderMNIST

# MNIST dataset will be downloaded and cached if needed
loader = CDataLoaderMNIST()

2024-05-08 17:08:58,514 - secml.settings - INFO - New `SECML_HOME_DIR` created: /root/secml-data
2024-05-08 17:08:58,514 - secml.settings - INFO - New `SECML_HOME_DIR` created: /root/secml-data


INFO:secml.settings:New `SECML_HOME_DIR` created: /root/secml-data


2024-05-08 17:08:58,519 - secml.settings - INFO - Default configuration file copied to: /root/secml-data/secml.conf
2024-05-08 17:08:58,519 - secml.settings - INFO - Default configuration file copied to: /root/secml-data/secml.conf


INFO:secml.settings:Default configuration file copied to: /root/secml-data/secml.conf


2024-05-08 17:08:58,524 - secml.settings - INFO - New `SECML_DS_DIR` created: /root/secml-data/datasets
2024-05-08 17:08:58,524 - secml.settings - INFO - New `SECML_DS_DIR` created: /root/secml-data/datasets


INFO:secml.settings:New `SECML_DS_DIR` created: /root/secml-data/datasets


2024-05-08 17:08:58,530 - secml.settings - INFO - New `SECML_MODELS_DIR` created: /root/secml-data/models
2024-05-08 17:08:58,530 - secml.settings - INFO - New `SECML_MODELS_DIR` created: /root/secml-data/models


INFO:secml.settings:New `SECML_MODELS_DIR` created: /root/secml-data/models


2024-05-08 17:08:58,535 - secml.settings - INFO - New `SECML_EXP_DIR` created: /root/secml-data/experiments
2024-05-08 17:08:58,535 - secml.settings - INFO - New `SECML_EXP_DIR` created: /root/secml-data/experiments


INFO:secml.settings:New `SECML_EXP_DIR` created: /root/secml-data/experiments


2024-05-08 17:08:58,540 - secml.settings - INFO - New `SECML_LOGS_DIR` created: /root/secml-data/logs
2024-05-08 17:08:58,540 - secml.settings - INFO - New `SECML_LOGS_DIR` created: /root/secml-data/logs


INFO:secml.settings:New `SECML_LOGS_DIR` created: /root/secml-data/logs


2024-05-08 17:08:58,544 - secml.settings - INFO - New `SECML_PYTORCH_DIR` created: /root/secml-data/pytorch-data
2024-05-08 17:08:58,544 - secml.settings - INFO - New `SECML_PYTORCH_DIR` created: /root/secml-data/pytorch-data


INFO:secml.settings:New `SECML_PYTORCH_DIR` created: /root/secml-data/pytorch-data


Downloading from `https://gitlab.com/api/v4/projects/secml%2Fsecml-zoo/repository/files/datasets%2FMNIST%2Ftrain-images-idx3-ubyte.gz/raw?ref=master` (9912422 bytes)

File stored in `/root/secml-data/datasets/mnist/train-images-idx3-ubyte.gz`
Downloading from `https://gitlab.com/api/v4/projects/secml%2Fsecml-zoo/repository/files/datasets%2FMNIST%2Ftrain-labels-idx1-ubyte.gz/raw?ref=master` (28881 bytes)

File stored in `/root/secml-data/datasets/mnist/train-labels-idx1-ubyte.gz`
Downloading from `https://gitlab.com/api/v4/projects/secml%2Fsecml-zoo/repository/files/datasets%2FMNIST%2Ft10k-images-idx3-ubyte.gz/raw?ref=master` (1648877 bytes)

File stored in `/root/secml-data/datasets/mnist/t10k-images-idx3-ubyte.gz`
Downloading from `https://gitlab.com/api/v4/projects/secml%2Fsecml-zoo/repository/files/datasets%2FMNIST%2Ft10k-labels-idx1-ubyte.gz/raw?ref=master` (4542 bytes)

File stored in `/root/secml-data/datasets/mnist/t10k-labels-idx1-ubyte.gz`


In [None]:
random_state = 42

n_tr = 1200  # Number of training set samples
n_val = 500  # Number of validation set samples
n_ts = 500  # Number of test set samples

digits = (5, 9)

tr_val = loader.load('training', digits=digits, num_samples=n_tr + n_val)
ts = loader.load('testing', digits=digits, num_samples=n_ts)

# Split in training and validation set
tr = tr_val[:n_tr, :]
val = tr_val[n_tr:, :]

# Normalize the features in `[0, 1]`
tr.X /= 255
val.X /= 255
ts.X /= 255

from secml.ml.classifiers import CClassifierSVM
# train SVM in the dual space, on a linear kernel, as needed for poisoning
clf = CClassifierSVM(C=10, kernel='linear')

print("Training of classifier...")
clf.fit(tr.X, tr.Y)

# Compute predictions on a test set
y_pred = clf.predict(ts.X)

# Metric to use for performance evaluation
from secml.ml.peval.metrics import CMetricAccuracy
metric = CMetricAccuracy()

# Evaluate the accuracy of the classifier
acc = metric.performance_score(y_true=ts.Y, y_pred=y_pred)

print("Accuracy on test set: {:.2%}".format(acc))

Training of classifier...
Accuracy on test set: 96.60%


In [None]:
from secml.figure import CFigure
# Only required for visualization in notebooks
%matplotlib inline

# Let's define a convenience function to easily plot the MNIST dataset
def show_digits(samples, preds, labels, digs, n_display=8):
    samples = samples.atleast_2d()
    n_display = min(n_display, samples.shape[0])
    fig = CFigure(width=n_display*2, height=3)
    for idx in range(n_display):
        fig.subplot(2, n_display, idx+1)
        fig.sp.xticks([])
        fig.sp.yticks([])
        fig.sp.imshow(samples[idx, :].reshape((28, 28)), cmap='gray')
        fig.sp.title("{} ({})".format(digits[labels[idx].item()], digs[preds[idx].item()]),
                     color=("green" if labels[idx].item()==preds[idx].item() else "red"))
    fig.show()

In [None]:
lb, ub = 5, None  # Bounds of the attack space. Can be set to `None` for unbounded
n_poisoning_points = 600  # Number of poisoning points to generate

# Should be chosen depending on the optimization problem
total_runs = 12
poisoned_datasets = []  # List to store the poisoned datasets


for i in range(total_runs):
    print(f"Running attack {i+1}/{total_runs}...")
    solver_params = {
        'eta': 0.25,
        'eta_min': 2.0,
        'eta_max': None,
        'max_iter': 100,
        'eps': 1e-5
    }

    from secml.adv.attacks import CAttackPoisoningSVM
    pois_attack = CAttackPoisoningSVM(classifier=clf,
                                      training_data=tr,
                                      val=val,
                                      lb=lb, ub=ub,
                                      solver_params=solver_params,
                                      random_seed=random_state)
    pois_attack.n_points = n_poisoning_points
    pois_y_pred, _, pois_points_ds, _ = pois_attack.run(ts.X, ts.Y)
    poisoned_datasets.append(pois_points_ds)

def show_digits(samples, preds, labels, digs, n_display=8):
    samples = samples.atleast_2d()
    n_display = min(n_display, samples.shape[0])
    fig = CFigure(width=n_display*2, height=3)
    for idx in range(n_display):
        fig.subplot(2, n_display, idx+1)
        fig.sp.xticks([])
        fig.sp.yticks([])
        fig.sp.imshow(samples[idx, :].reshape((28, 28)), cmap='gray')
        fig.sp.title("{} ({})".format(digits[labels[idx].item()], digs[preds[idx].item()]),
                     color=("green" if labels[idx].item()==preds[idx].item() else "red"))
    fig.show()


Running attack 1/12...


KeyboardInterrupt: 

In [None]:
import torch
pois = []

for i in range(total_runs):
    # Assuming each dataset contains images in .X and labels in .Y
    data_tensor = torch.tensor(poisoned_datasets[i].X.tondarray(), dtype=torch.float32)  # Convert data to tensor
    pois.append(data_tensor)

# Concatenate all tensors in the list to form a single tensor
poisoned_images = torch.cat(pois, dim=0)
print(poisoned_images.shape)

# Reshape if necessary, ensure the total number of images matches expected dimensions
poisoned_images = poisoned_images.reshape(-1, 1, 28, 28)
print("Reshaped tensor dimensions:", poisoned_images.shape)

torch.Size([600, 784])
Reshaped tensor dimensions: torch.Size([600, 1, 28, 28])


In [None]:
import random
import numpy as np
from torch.utils.data import Subset
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor, Normalize
from torch.utils.data import DataLoader

transform = Normalize((0.5,), (0.5,))
mnist_dataset = MNIST(root='./data', train=True, download=True, transform=ToTensor())
mnist_dataset.data = mnist_dataset.data.unsqueeze(1)
indices = list(range(len(mnist_dataset)))
random.shuffle(indices)

subsets_indices = np.array_split(indices, 10)

subsets = [Subset(mnist_dataset, subset_indices) for subset_indices in subsets_indices]


indices = list(range(len(mnist_dataset)))
random.shuffle(indices)
subset_indices = np.array_split(indices, 10)

# Select subset 0
subset_0_indices = subset_indices[0]
labels_5_or_9_indices = [idx for idx in subset_0_indices if mnist_dataset.targets[idx] in (5, 9)]

n_replace = int(poisoned_images.shape[0])

# Randomly choose indices in subset 0 of class 5 & 9 that will be replaced
replace_indices = np.random.choice(labels_5_or_9_indices, n_replace, replace=False)

j = 0
idx = 0

for i in replace_indices:
    if j < n_replace:
        mnist_dataset.data[i] = poisoned_images[j]
        j += 1


mnist_dataset.data = mnist_dataset.data.squeeze(1)

In [None]:
import torch
from scipy.stats import entropy
from torch.utils.data import DataLoader, ConcatDataset, Subset


def get_latent_variables(model, loader):
    model.eval()
    latents = []
    with torch.no_grad():
        for data, _ in loader:
            data = data.view(data.size(0), -1)
            z, _ = model(data)
            latents.append(z)
    return torch.cat(latents, dim=0)

def calculate_kl_divergence(p, q):
    p += 1e-12
    q += 1e-12
    kl_div = p * torch.log(p / q)
    return kl_div.sum()

kl_divergences = []

for i in range(10):

    combined_dataset = ConcatDataset([subsets[j].dataset for j in range(10) if j != i])
    combined_loader = DataLoader(combined_dataset, batch_size=64, shuffle=True)

    z_combined = get_latent_variables(model, combined_loader)
    z_excluded = get_latent_variables(model, subsets[i])

    hist_combined, _ = torch.histogram(z_combined, bins=100, range=(-5, 5), density=True)
    hist_excluded, _ = torch.histogram(z_excluded, bins=100, range=(-5, 5), density=True)

    kl_div = calculate_kl_divergence(hist_excluded, hist_combined)
    kl_divergences.append(kl_div.item())
    print(f"KL divergence for subset {i} as test set: {kl_div.item()}")

average_kl_divergence = sum(kl_divergences) / len(kl_divergences)
print("Average KL divergence:", average_kl_divergence)