### Notwendige Module importieren

In [1]:
import torch
import torchvision
from torch.utils.data import DataLoader, Dataset, ConcatDataset
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import SGD
from torch.optim import Adam
from sklearn.model_selection import train_test_split
import random
from tqdm import tqdm
import numpy as np
import time
import matplotlib.pyplot as plt
import os
import cv2
from IPython.display import clear_output
import itertools

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)


# Umschalten zwischen Colab oder lokaler Installation
USING_COLAB = False

if USING_COLAB:
  from google.colab import drive
  from google.colab.patches import cv2_imshow
  drive.mount('/content/drive')

ModuleNotFoundError: No module named 'cv2'

### Definition von Konstanten

In [None]:
BATCH_SIZE = 64
LEARNING_RATE = 1e-2
epochs = 40
bTrainModel = True


if USING_COLAB:
  filenameModel = '/content/drive/My Drive/THK/ColabNotebooks/results/modelMNIST.pth'
else:
  filenameModel = 'results/modelMNIST.pth'

if USING_COLAB:
  pathImages = '/content/drive/My Drive/THK/ColabNotebooks/data/MNIST/images'
else:
  pathImages = 'data/MNIST/images'

if USING_COLAB:
  pathHackedImages = '/content/drive/My Drive/THK/ColabNotebooks/data/MNIST/hackedImages'
else:
  pathHackedImages = 'data/MNIST/hackedImages'

if USING_COLAB:
  pathStore = '/content/drive/My Drive/THK/ColabNotebooks/data/MNIST/adversarialImages'
else:
  pathStore = 'data/MNIST/adversarialImages'

### Laden des Datensatzes

##### Train Dataset

In [None]:
train_set = torchvision.datasets.MNIST('data/', download=True, train=True)
train_images = train_set.data
train_targets = train_set.targets
print(f"Dataset images shape: \t{train_images.shape}")
print(f"Dataset labels shape: \t{train_targets.shape}")
print(f"Label classes: \t\t{train_targets.unique()}")
print(f"Pixel values from \t{train_images.min().numpy()} to {train_images.max().numpy()}")

##### Test Dataset

In [None]:
test_set = torchvision.datasets.MNIST('data/', download=True, train=False)
test_images = test_set.data
test_targets = test_set.targets
print(f"Dataset images shape: \t{test_images.shape}")
print(f"Dataset labels shape: \t{test_targets.shape}")
print(f"Label classes: \t\t{test_targets.unique()}")
print(f"Pixel values from \t{test_images.min().numpy()} to {test_images.max().numpy()}")

### Erstellen nötiger Klassen und Funktionen

##### MNISTDataset klasse für einfachere Handhabung

In [None]:
class MNISTDataset(Dataset):
    def __init__(self, x, y):
        x = x.float()/255
        x = x.unsqueeze(1)
        self.x, self.y = x.cpu(), y.cpu()
                 
    def __getitem__(self, ix):
        x, y = self.x[ix], self.y[ix]
        return x.to(device), y.to(device)
    
    def __len__(self):
        return len(self.x)
    
    def append(self, newX, newY):
        self.x = torch.cat([self.x, newX], dim=0)
        self.y = torch.cat([self.y, newY], dim=0)

##### Funktion zum aufteilen des Train-Dataset in train und validate data und erzeugen der DataLoader für späteres Training

In [None]:
def get_data(train_images, train_targets, test_images, test_targets):
  train_indices, val_indices, _, _ = train_test_split(range(len(train_images)), train_targets, test_size=0.2, stratify=train_targets)

  imagesToTrain = []
  targetsToTrain = []

  for idx in train_indices:
    imagesToTrain.append(train_images[idx])
    targetsToTrain.append(train_targets[idx])
  
  imagesToTrainTensor = torch.stack(imagesToTrain, dim=0)
  targetsToTrainTensor = torch.stack(targetsToTrain, dim=0)

  train_ds = MNISTDataset(imagesToTrainTensor, targetsToTrainTensor)
  train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)

  imagesToValidation = []
  targetsToValidation = []

  for idx in val_indices:
    imagesToValidation.append(train_images[idx])
    targetsToValidation.append(train_targets[idx])
  
  imagesToValidationTensor = torch.stack(imagesToValidation, dim=0)
  targetsToValidationTensor = torch.stack(targetsToValidation, dim=0)

  validate_ds = MNISTDataset(imagesToValidationTensor, targetsToValidationTensor)
  validate_dl = DataLoader(validate_ds, batch_size=len(targetsToValidationTensor), shuffle=False)

  test_ds = MNISTDataset(test_images, test_targets)
  test_dl = DataLoader(test_ds, batch_size=len(test_images), shuffle=False)
  
  return train_ds, train_dl, test_ds, test_dl, validate_ds, validate_dl

##### Erzeugen des models inklusive Convolutional Neural Network, Optimizer und Loss-Function

In [None]:
def get_model():
    model = nn.Sequential(
        nn.Conv2d(1, 10, kernel_size=5),
        nn.MaxPool2d(2),
        nn.ReLU(),
        nn.Conv2d(10, 20, kernel_size=5),
        nn.Dropout2d(0.5),
        nn.MaxPool2d(2),
        nn.ReLU(),
        nn.Flatten(),
        nn.Linear(320, 50),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(50 ,10)
    ).to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = SGD(model.parameters(), momentum=0.5, lr=LEARNING_RATE)
    return model, loss_fn, optimizer

##### Funktion zum Trainieren mit einem Batch

In [None]:
def train_batch(x, y, model, optimizer, loss_fn):
    model.train()
    prediction = model(x)
    batch_loss = loss_fn(prediction, y)
    batch_loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    return batch_loss.item()

##### Funktion zum berechnen der Accuracy

In [None]:
@torch.no_grad()
def accuracy(x, y, model):
  with torch.no_grad():
    prediction = model(x)
  max_values, argmaxes = prediction.max(-1)
  is_correct = argmaxes == y
  return is_correct.cpu().numpy().tolist()

##### Funktion zum Berechnen dess Loss

In [None]:
@torch.no_grad()
def loss(x, y, model, loss_fn):
  with torch.no_grad():
    prediction = model(x)
    loss = loss_fn(prediction, y)
  return loss.item()

### Erzeugen von DataLoader und Model

In [None]:
train_ds, train_dl, test_ds, test_dl, validate_ds, validate_dl = get_data(train_images, train_targets, test_images, test_targets)
model, loss_fn, optimizer = get_model()

### Plotten der Ergebnisse

In [None]:
def plot_evals(train_loss, val_loss, train_acc, val_acc, store=None):
    plt.style.use("ggplot")
    plt.figure()
    plt.plot(train_loss, label="train_loss")
    plt.plot(val_loss, label="val_loss")
    plt.plot(train_acc, label="train_acc")
    plt.plot(val_acc, label="val_acc")
    plt.title("Training Loss and Accuracy on Dataset")
    plt.xlabel("Epoch #")
    plt.ylabel("Loss/Accuracy")
    plt.legend(loc="best")
    
    if store is not None:
        plt.savefig(store)

    plt.show()

### Trainieren

In [None]:
def trainModel(model, train_dl, validate_dl, test_dl, epochs):
  #Train

  model.train()

  epoch_loss = []
  train_acc_epoch = []

  val_epoch_loss = []
  val_acc_epoch = []

  for epoch in range(epochs):
      start = time.time()
      loss_each_batch = []
      acc_each_batch = []

      #Model trainieren
      for x, y in tqdm(train_dl):
          x = x.to(device)
          y = y.to(device)
          #tepoch.set_description(f"Epoch {epoch}")
          batch_loss = train_batch(x, y, model, optimizer, loss_fn)
          loss_each_batch.append(batch_loss)

      epoch_loss.append(np.array(loss_each_batch).mean())
      train_acc_batch = []

      #Evaluation auf Train Dataset
      for i, batch in enumerate(iter(train_dl)):
          x, y = batch
          is_correct = accuracy(x, y, model)
          train_acc_batch.extend(is_correct)
      train_acc_epoch.append(np.mean(train_acc_batch))

      val_acc_batch = []
      val_loss_batch = []

      #Evaluation auf Validation Dataset
      for i, batch in enumerate(iter(validate_dl)):
          x, y = batch
          is_correct = accuracy(x, y, model)
          val_acc_batch.extend(is_correct)
          val_loss = loss(x, y, model, loss_fn)
          val_loss_batch.append(val_loss)
      val_epoch_loss.append(np.array(val_loss_batch).mean())
      val_acc_epoch.append(np.array(val_acc_batch).mean())

      print(f"Epoch {epoch+1}/{epochs}:\taccTrain: {100. * train_acc_epoch[-1]:.2f}\taccVal: {100. * val_acc_epoch[-1]:.2f}\tloss: {epoch_loss[-1]:.2f}\ttime: {time.time()-start:.2f}")

  # Testen auf Test-Dataset
  test_acc = []
  testRes = 0
  for i, batch in enumerate(iter(test_dl)):
    x, y = batch
    is_correct = accuracy(x, y, model)
    test_acc.extend(is_correct)
    testRes = np.array(test_acc).mean()
  print( f"testRes {100. * testRes:.2f}" )
    
  plot_evals(epoch_loss, val_epoch_loss, train_acc_epoch, val_acc_epoch)

In [None]:
if bTrainModel:
    trainModel(model, train_dl, validate_dl, test_dl,  epochs)

### Speichern/Laden von Model und Optimizer

In [None]:
if bTrainModel:
  torch.save(model.state_dict(), filenameModel)
  #torch.save(optimizer.state_dict(), 'results/optimizer.pth')
else:
  model.load_state_dict(torch.load(filenameModel))
  model = model.to(device)

### Funktion zum Erzeugen von Adversarial Images mit Rauschen, MNIST-Bild oder leerem Bild als initiales Bild

#### Mit learning rate spielen!
- 1e-2 ist gut für das Hinzufügen von vielen adversarial images zum Datensatz, da Geschwindigkeit höher
- 1e-4 ist gut um adversarial images zu erzeugen, welche sehr nah am Original sind

In [None]:
import os

def create_adversarial(from_number: int, to_predict: int, display=False, store=False) -> torch.Tensor:

    #model.eval()
        
    pathOutput = pathStore + '/noise/'

    if 0 <= from_number <= 9:
        pathOutput = pathStore + '/' + str(from_number) + '/'
        # Zufälliges initiales Bild aus from_number Klasse holen
        indices = [i for i, y in enumerate(train_ds.y) if y == from_number]
        rndIdx = random.choice(indices)
        adversarial_sample, adversarial_target = train_ds[rndIdx]
        adversarial_sample = adversarial_sample.unsqueeze(0)
        #Bei Bedarf Rauschen hinzufügen
        #noise = torch.rand((1, 1, 28, 28)).requires_grad_()
        #adversarial_sample.data += 0.1 * noise.data
    else:
        adversarial_sample = torch.rand(1, 1, 28, 28)  # Werte hier gleichverteilt in [0, 1]
        adversarial_sample = adversarial_sample.to(device)

    # Create directories if they don't exist yet
    if store:
        if not os.path.exists(pathStore):
            os.makedirs(pathStore)
        if not os.path.exists(pathOutput):
            os.makedirs(pathOutput)

    # Label-Tensor erzeugen
    targeted_adversarial_class = torch.tensor([to_predict]).to(device)

    # Initiales Bild plotten
    if display:
        plt.imshow(adversarial_sample.cpu().data.view(28, 28), cmap='gray')
        plt.grid(False)
        plt.show()
        time.sleep(1)

    # Optimizer für Gradientenabstieg erzeugen
    adversarial_optimizer = torch.optim.SGD([adversarial_sample.requires_grad()], lr=1e-3)

    # History initialisieren
    losses = []

    for i in range(50000):
        #Gradienten zurücksetzen
        adversarial_optimizer.zero_grad()

        #Model predicten lassen
        prediction = model(adversarial_sample)

        #Loss berechnen und zu History hinzufügen
        adv_loss = nn.CrossEntropyLoss()(prediction, targeted_adversarial_class)
        losses.append(adv_loss.cpu().detach().numpy())

        #Die Predicted Class aus prediction auswerten
        predicted_class = np.argmax(prediction.cpu().detach().numpy(), axis=1)

        #Falls gewünschte Klasse predicted wurde, stoppen
        if predicted_class == to_predict:
            if display:
                clear_output(wait=True)
                plt.imshow(adversarial_sample.cpu().data.view(28, 28), cmap='gray')
                plt.grid(False)
                plt.show()     

                print( f"!! Predicted: {predicted_class[0]}" )
                print( f"!! Model output: {prediction.detach().cpu().numpy()}" )
                print( f"!! Loss: {adv_loss.data}" )

            if store:
                num_files = len(os.listdir(pathOutput))
                img_path = os.path.join(pathOutput, str(num_files)+".png")
                cv2.imwrite(img_path, (adversarial_sample.cpu().data.numpy()*255).reshape((28, 28)))

            return adversarial_sample

        # Backward propagation und Gradientenabstieg ausführen
        adv_loss.backward()
        adversarial_optimizer.step()

        # Clipping des Adversarial Images zwischen 0 und 1
        adversarial_sample.data = torch.clamp(adversarial_sample.data, 0, 1).to(device)

        # Darstellung des Adversarial Images nach 500 steps
        if display and i % 500 == 0:
            clear_output(wait=True)

            plt.imshow(adversarial_sample.data.cpu().view(28, 28), cmap='gray')
            plt.grid(False)
            plt.show()

            print( f"Predicted: {predicted_class[0]}" )
            print( f"Model output: {prediction.detach().cpu().numpy()}" )
            print('Loss:', np.average(losses))

In [None]:
#Adversarial Image testweise erzeugen
adversarial_sample = createAdversarial(0, 4, display=True, store=True)

In [None]:
for (i, j) in permutations(zip(range(10), range(10))):
        adversarial_sample = create_adversarial(i, j, display=True, store=True)
        adversarials[i].append(adversarial_sample)

In [None]:
# 20 Adversarial Images pro Klasse erzeugen
for i, j in tqdm(itertools.product(range(10), range(20))):
  originalClass = i
  targetClass = -1
  while targetClass < 0:
    rnd = np.random.randint(10)
    if rnd != originalClass:
      targetClass=rnd

  print( f"original class {originalClass}  target class {targetClass}" )

  adv = createAdversarial(originalClass, targetClass, display=False, store=True)
  advY = torch.tensor([originalClass])

  if None==adv:
    print("---")
  else:
    print("+++")
    #adv = adv.cpu()
    #train_ds.append( adv.cpu(), advY )

In [None]:
def printDataSetInfo( ds ):
  print( f"#samples: {len(ds)}" )
  #print( f"first elem: {ds[0]}" )
  print( f"first x shape: {ds[0][0].shape}" )  # Modify according to your data structure
  print( f"first y: {ds[0][1]}" )  # Modify according to your data structure
  print( f"last x shape: {ds[-1][0].shape}" )  # Modify according to your data structure
  print( f"last y: {ds[-1][1]}" )  # Modify according to your data structure

  class_counts = [0] * 10

  # Iterate over the dataset and count examples per class
  for _, label in ds:
      class_counts[label] += 1

  # Print the number of examples per class
  for class_label, count in enumerate(class_counts):
      print(f"Class {class_label}: {count} examples")