In [None]:
import os
import sys
import numpy as np
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.utils as vutils
import matplotlib.pyplot as plt

from PIL import Image

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
CUDA = True
seed = 1

In [None]:
# check cuda
CUDA = CUDA and torch.cuda.is_available()
print("PyTorch version: {}".format(torch.__version__))
if CUDA:
    print("CUDA version: {}\n".format(torch.version.cuda))

if CUDA:
    torch.cuda.manual_seed(seed)
device = torch.device("cuda:0" if CUDA else "cpu")
cudnn.benchmark = True

PyTorch version: 2.1.0+cu121
CUDA version: 12.1



In [None]:
# locate the MNIST data
def load_fashion_dataloaders():
    transform = transforms.Compose([
        transforms.ToTensor(),
        # transforms.Resize(X_DIM),
        transforms.Normalize((0.5,), (0.5,))
    ])

    train_set = torchvision.datasets.FashionMNIST(
        root='/content/drive/MyDrive/Data',
        train=True,
        download=True,
        transform=transform
    )
    train_loader = torch.utils.data.DataLoader(
        train_set,
        batch_size=8,
        shuffle=False,
        num_workers=2
    )

    test_set = torchvision.datasets.FashionMNIST(
        root='/content/drive/MyDrive/Data',
        train=False,
        download=True,
        transform=transform
    )
    test_loader = torch.utils.data.DataLoader(
        test_set,
        batch_size=1,
        shuffle=False
    )

    return train_loader, test_loader

In [None]:
train_loader, test_loader = load_fashion_dataloaders()

In [None]:
class CNN(nn.Module):

	def __init__(self):
		super(CNN, self).__init__()
		self.main = nn.Sequential(

			# input is Z, going into a convolution
			nn.Conv2d(1, 8, kernel_size=5, stride=1, padding=2),
			nn.BatchNorm2d(8),
			nn.ReLU(True),
			nn.Dropout2d(p=0.1),

			nn.Conv2d(8, 16, kernel_size=5, stride=2, padding=2),
			nn.BatchNorm2d(16),
			nn.ReLU(True),
			nn.Dropout2d(p=0.1),

			nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
			nn.BatchNorm2d(32),
			nn.ReLU(True),
			nn.Dropout2d(p=0.1),

			nn.Conv2d(32, 64, kernel_size=5, stride=2, padding=2),
			nn.BatchNorm2d(64),
			nn.ReLU(True),
			nn.Dropout2d(p=0.2),

			nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
			nn.BatchNorm2d(128),
			nn.ReLU(True)
		)

		self.classifier = nn.Sequential(
			nn.Linear(128, 10),
		)

	def forward(self, x):
		x = self.main(x) #
		x = torch.mean(x.view(x.size(0), x.size(1), -1), dim=2)  # GAP Layer
		logits = self.classifier(x)
		pred = F.softmax(logits, dim=1)

		return pred, logits, x


In [None]:
model = CNN().to(device)
model_checkpoint = torch.load('/content/drive/MyDrive/Data/cnn_lr_2e_4_epoch50.pth')
model.load_state_dict(model_checkpoint)

model

CNN(
  (main): Sequential(
    (0): Conv2d(1, 8, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Dropout2d(p=0.1, inplace=False)
    (4): Conv2d(8, 16, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (5): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU(inplace=True)
    (7): Dropout2d(p=0.1, inplace=False)
    (8): Conv2d(16, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (9): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU(inplace=True)
    (11): Dropout2d(p=0.1, inplace=False)
    (12): Conv2d(32, 64, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (13): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (14): ReLU(inplace=True)
    (15): Dropout2d(p=0.2, inplace=False)
    (16): Conv2d(64, 128, kernel_size=(

In [None]:
class Discriminator(nn.Module):
    def __init__(self, ngpu=1, nc=1, ndf=64):
        super(Discriminator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            # input is (nc) x 64 x 64
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf) x 32 x 32
            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*2) x 16 x 16
            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*4) x 8 x 8
            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*8) x 4 x 4
            nn.Conv2d(ndf * 8, 1, 1, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        if input.is_cuda and self.ngpu > 1:
            output = nn.parallel.data_parallel(self.main, input, range(self.ngpu))
        else:
            output = self.main(input)
        return output.view(-1, 1).squeeze(1)

class Generator(nn.Module):
    def __init__(self, ngpu=1, nc=1, nz=100, ngf=64):
        super(Generator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            # input is Z, going into a convolution
            nn.ConvTranspose2d(     nz, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),
            # state size. (ngf*8) x 4 x 4
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),
            # state size. (ngf*4) x 8 x 8
            nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),
            # state size. (ngf*2) x 16 x 16
            nn.ConvTranspose2d(ngf * 2,     ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            nn.ConvTranspose2d(    ngf,      nc, kernel_size=1, stride=1, padding=2, bias=False),
            nn.Tanh()
        )

    def forward(self, input):
        if input.is_cuda and self.ngpu > 1:
            output = nn.parallel.data_parallel(self.main, input, range(self.ngpu))
        else:
            output = self.main(input)
        return output

In [None]:
netD = Discriminator().to(device)
discriminator_checkpoint = torch.load('/content/drive/MyDrive/Data/discriminator_weights_epoch70.pth')
netD.load_state_dict(discriminator_checkpoint)
netD.eval()
netG = Generator().to(device)
generator_checkpoint = torch.load('/content/drive/MyDrive/Data/generator_weights_epoch70.pth')
netG.load_state_dict(generator_checkpoint)
netG.eval()
print(netG)
print(netD)

In [None]:
# 10 types of clothes in FashionMNIST dataset
def output_label(label):
  output_mapping = {
      0: "T-shirt",
      1: "Trouser",
      2: "Pullover",
      3: "Dress",
      4: "Coat",
      5: "Sandal",
      6: "Shirt",
      7: "Sneaker",
      8: "Bag",
      9: "Ankle Boot"
  }

  input = (label.item() if type(label) == torch.Tensor else label)

  return output_mapping[input]

In [None]:
def loss_func(output, target):
    return torch.norm(output - target)**2

correct_predictions = 0
total_samples = 0
count = 0
num_epochs = 3000

# Use the model for inference on the test set
# with torch.no_grad():  # Disable gradient calculation during inference
for i, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  outputs, _, _ = model(images)
  predicted = torch.argmax(outputs, 1)
  correct_predictions += (predicted == labels).sum().item()
  total_samples += labels.size(0)

  if predicted != labels:

    count += 1
    image_output = images.squeeze()
    image_output = transforms.functional.to_pil_image(image_output)
    filename = f"true_label_{output_label(labels)}_predicted_label_{output_label(predicted)}_{count}.png"
    image_output.save(os.path.join('/content/drive/MyDrive/Data/incorrect_img2', filename))


    I = images
    z0 = torch.randn(1, 100, 1, 1, device=device)
    z0.requires_grad = True
    optimizer = optim.Adam([z0], lr=0.01)

    for epoch in range(num_epochs):
        generated_img = netG(z0)

        reconstruction_loss = loss_func(generated_img, I)
        # which layer should be compared?

        # feature_loss = loss_func(netD(netG(z0)), netD(I))

        C, _, _ = model(netG(z0))
        C_prine, _, _ = model(I)
        feature_loss = loss_func(C, C_prine)
        loss = reconstruction_loss + feature_loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    save_base_path = '/content/drive/MyDrive/Data/incorrect_latent_z2'
    pt_filename = f'true_label_{output_label(labels)}_predicted_label_{output_label(predicted)}_{count}.pt'
    save_path = os.path.join(save_base_path, pt_filename)
    torch.save(z0.detach(), save_path)

    if count == 50:
      break

  if i % 100 == 0:
    print("[%d/%d]"% (i, len(test_loader)))




# Calculate accuracy
accuracy = correct_predictions / total_samples
print(f'Test Accuracy: {accuracy * 100:.2f}%')
print("Number of incorrect predicted images: ", count)

[0/10000]
[100/10000]
[200/10000]
[300/10000]
Test Accuracy: 87.41%
Number of incorrect predicted images:  50


In [None]:
pt_file_path = '/content/drive/MyDrive/Data/incorrect_latent_z2'

In [None]:
pt_files = [f for f in os.listdir(pt_file_path) if f.endswith('.pt')]

In [None]:
len(pt_files)

50

In [None]:
for pt_file in pt_files:
  pt_path = os.path.join(pt_file_path, pt_file)
  ze = torch.load(pt_path)
  ze = ze.to(device)
  img_z = netG(ze).squeeze()
  img_z = transforms.functional.to_pil_image(img_z)

  img_z.save(os.path.join('/content/drive/MyDrive/Data/incorrect_gen_img2', f"{pt_file.replace('.pt', '.png')}"))
