In [1]:
!cp -r /kaggle/input/cable-weights/* /kaggle/working

In [2]:
!cp -r /kaggle/input/cable/* /kaggle/working

# Introduction

In this project, we will attempt to greate a Cycle Generative Adversarial Network (cycleGAN) in order to manipulate defective industrial parts. We will attempt to augment images data.

Our first step is to load the neccessary libraries we will be using for the entire process.

In [55]:
%matplotlib inline
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.optim import Adam
from torch.utils.data import DataLoader
import torch.utils.data as data
import random
from torchvision import transforms
from PIL import Image
from scipy.stats import ttest_ind
from sklearn.metrics import roc_auc_score
import os , itertools, cv2
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings('ignore')

Since we are using two platforms, we will be assigning a directory to the location of the data for each platform. This will make it easier to access the data, depending the platform we will be using.

The two platforms will be:
1. A local installation of  Jupyter Notebook - work on local codes. GPU is limited
2. A Google Colaboratory Notebook - Use of available resources and GPU to run the model

In [4]:
data_dir = 'cable'
train_dir = os.path.join(data_dir,'train')
test_dir = os.path.join(data_dir,'test')

## Parameters

In this block, we will create global parameters which we can use at different areas of this project.

In [6]:
# Creating the Parameters

parameters = {
    'batch_size': 1,
    'input_size': 256,
    'resize_scale': 286,
    'crop_size': 256,
    'fliplr':True,
    
    # Model parameters
    'num_epochs': 100,
    'decay_epoch': 100,
    'ngf': 32,   # Number of generator filters
    'ndf': 64,   # Number of discriminator filters
    'num_resnet': 6, # Number of resnet blocks
    'lrG': 0.0002,    # Learning rate for generator
    'lrD': 0.0002,    # Learning rate for discriminator
    'beta1': 0.5 ,    # Beta1 for Adam optimizer
    'beta2': 0.999 ,  # Beta2 for Adam optimizer
    'lambdaA': 10 ,   # LambdaA for cycle loss
    'lambdaB': 10  ,  # LambdaB for cycle loss
}
          

## Creating functions

In the next set, we will create several functions that will be used throughout. The first function will convert our data into Numpy, which we can call at a later point.

We will also set a variable so we can use a GPU for faster processing.

In [32]:
# Convert to Numpy 
def to_numpy(x):
    return x.data.cpu().numpy()

def load_weights(model, optimizer, filename):
    if os.path.isfile(filename):
        ckpt = torch.load(filename)
        model.load_state_dict(ckpt['model_state_dict'])
        optimizer.load_state_dict(ckpt['optimizer_state_dict'])
    else:
        print(f'No checkpoint found at {filename}')
        
def save_image(image, save_path, i):
    img = to_numpy(image[0])
    img = img.squeeze()
    img = (((img - img.min()) * 255) / (img.max() - img.min())).transpose(1, 2, 0).astype(np.uint8)
    img_path = os.path.join(save_path, f"image_{i}.jpg")
    cv2.imwrite(img_path, img)

# Use GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

### Transformer

Since we will be using a __transformer__ at a later stage, we will set a transformer variable and assign it procedures which transforms an image size accordingly. This transformer will also convert the image into a Tensor and Normalize it as well.

In [9]:
transform = transforms.Compose([
    transforms.Resize(size=parameters['input_size']),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])


Next, we will define an __ImagePool__ function

In [10]:
class ImagePool():
    def __init__(self, pool_size):
        self.pool_size = pool_size
        if self.pool_size > 0:
            self.num_imgs = 0
            self.images = []

    def query(self, images):
        if self.pool_size == 0:
            return images
        return_images = []
        for image in images.data:
            image = torch.unsqueeze(image, 0)
            if self.num_imgs < self.pool_size:
                self.num_imgs = self.num_imgs + 1
                self.images.append(image)
                return_images.append(image)
            else:
                p = random.uniform(0, 1)
                if p > 0.5:
                    random_id = random.randint(0, self.pool_size-1)
                    tmp = self.images[random_id].clone()
                    self.images[random_id] = image
                    return_images.append(tmp)
                else:
                    return_images.append(image)
        return_images = Variable(torch.cat(return_images, 0))
        return return_images

### Creating the Dataloader

In this section, we begin by creating the __dataloader__ which we will use to retrieve the data in parts.

In [23]:
class DatasetFromFolder(data.Dataset):
    def __init__(self, image_dir, subfolder='train', transform=None, resize_scale=None, crop_size=None, fliplr=False):
        super(DatasetFromFolder, self).__init__()
        self.input_path = os.path.join(image_dir, subfolder)
        self.image_filenames = [x for x in sorted(os.listdir(self.input_path))]
        self.transform = transform

        self.resize_scale = resize_scale
        self.crop_size = crop_size
        self.fliplr = fliplr

    def __getitem__(self, index):
        # Load Image
        img_fn = os.path.join(self.input_path, self.image_filenames[index])
        img = Image.open(img_fn).convert('RGB')

        # preprocessing
        if self.resize_scale:
            img = img.resize((self.resize_scale, self.resize_scale), Image.BILINEAR)

        if self.crop_size:
            x = random.randint(0, self.resize_scale - self.crop_size + 1)
            y = random.randint(0, self.resize_scale - self.crop_size + 1)
            img = img.crop((x, y, x + self.crop_size, y + self.crop_size))
        if self.fliplr:
            if random.random() < 0.5:
                img = img.transpose(Image.FLIP_LEFT_RIGHT)

        if self.transform is not None:
            img = self.transform(img)

        return img

    def __len__(self):
        return len(self.image_filenames)

### Creating the Convolutional Block

The __convolution__ block will consist of two things.

1. Initialization

2. Forward pass.

In [11]:
class ConvBlock(torch.nn.Module):
    def __init__(self,input_size,output_size,kernel_size=3,stride=2,padding=1,activation='relu',batch_norm=True):
        super(ConvBlock,self).__init__()
        self.conv = torch.nn.Conv2d(input_size,output_size,kernel_size,stride,padding)
        self.batch_norm = batch_norm
        self.bn = torch.nn.InstanceNorm2d(output_size)
        self.activation = activation
        self.relu = torch.nn.ReLU(True)
        self.lrelu = torch.nn.LeakyReLU(0.2,True)
        self.tanh = torch.nn.Tanh()
    def forward(self,x):
        if self.batch_norm:
            out = self.bn(self.conv(x))
        else:
            out = self.conv(x)

        if self.activation == 'relu':
            return self.relu(out)
        elif self.activation == 'lrelu':
            return self.lrelu(out)
        elif self.activation == 'tanh':
            return self.tanh(out)
        elif self.activation == 'no_act':
            return out


### De-Convolution Function

In this class we will create, we will need to bring back the convolution from the results previously processed. There will be two functions inside this class.

1. Initialization

2. Forward pass

In [12]:
class DeconvBlock(torch.nn.Module):
    def __init__(self,input_size,output_size,kernel_size=3,stride=2,padding=1,output_padding=1,activation='relu',batch_norm=True):
        super(DeconvBlock,self).__init__()
        self.deconv = torch.nn.ConvTranspose2d(input_size,output_size,kernel_size,stride,padding,output_padding)
        self.batch_norm = batch_norm
        self.bn = torch.nn.InstanceNorm2d(output_size)
        self.activation = activation
        self.relu = torch.nn.ReLU(True)
    def forward(self,x):
        if self.batch_norm:
            out = self.bn(self.deconv(x))
        else:
            out = self.deconv(x)
        if self.activation == 'relu':
            return self.relu(out)
        elif self.activation == 'lrelu':
            return self.lrelu(out)
        elif self.activation == 'tanh':
            return self.tanh(out)
        elif self.activation == 'no_act':
            return out


### Residual Learning Block

In this secton, we will create a __residual learning__ block or __Resnet__. This class will also contain two functions.

1. Initialization

2. Forward pass

In [13]:
class ResnetBlock(torch.nn.Module):
    def __init__(self,num_filter,kernel_size=3,stride=1,padding=0):
        super(ResnetBlock,self).__init__()
        conv1 = torch.nn.Conv2d(num_filter,num_filter,kernel_size,stride,padding)
        conv2 = torch.nn.Conv2d(num_filter,num_filter,kernel_size,stride,padding)
        bn = torch.nn.InstanceNorm2d(num_filter)
        relu = torch.nn.ReLU(True)
        pad = torch.nn.ReflectionPad2d(1)

        self.resnet_block = torch.nn.Sequential(pad, conv1, bn, relu, pad, conv2, bn)
    def forward(self,x):
        out = self.resnet_block(x)
        return out

## Creating the Generator

We will now create the __Generator__. The Generator will consist of the following:
1. Encoder
2. Transformer
3. Decoder

In addition, we will also be adding __weights__.

After creating the class, we will create the actuall Generator.

In [14]:
class Generator(torch.nn.Module):
    def __init__(self,input_dim,num_filter,output_dim,num_resnet):
        super(Generator,self).__init__()

        # Reflection padding
        self.pad = torch.nn.ReflectionPad2d(3)
        # Encoder
        self.conv1 = ConvBlock(input_dim,num_filter,kernel_size=7,stride=1,padding=0)
        self.conv2 = ConvBlock(num_filter,num_filter*2)
        self.conv3 = ConvBlock(num_filter*2,num_filter*4)
        # Resnet blocks
        self.resnet_blocks = []
        for i in range(num_resnet):
            self.resnet_blocks.append(ResnetBlock(num_filter*4))
        self.resnet_blocks = torch.nn.Sequential(*self.resnet_blocks)
        #Decoder
        self.deconv1 = DeconvBlock(num_filter*4,num_filter*2)
        self.deconv2 = DeconvBlock(num_filter*2,num_filter)
        self.deconv3 = ConvBlock(num_filter,output_dim,kernel_size=7,stride=1,padding=0,activation='tanh',batch_norm=False)

    def forward(self,x):
        # Encoder
        enc1 = self.conv1(self.pad(x))
        enc2 = self.conv2(enc1)
        enc3 = self.conv3(enc2)
        # Resnet blocks
        res = self.resnet_blocks(enc3)
        # Decoder
        dec1 = self.deconv1(res)
        dec2 = self.deconv2(dec1)
        out = self.deconv3(self.pad(dec2))
        return out

    def normal_weight_init(self,mean=0.0,std=0.02):
        for m in self.children():
            if isinstance(m,ConvBlock):
                torch.nn.init.normal_(m.conv.weight,mean,std)
            if isinstance(m,DeconvBlock):
                torch.nn.init.normal_(m.deconv.weight,mean,std)
            if isinstance(m,ResnetBlock):
                torch.nn.init.normal_(m.conv.weight,mean,std)
                torch.nn.init.constant_(m.conv.bias,0)


### Creating the Generator and Testing the Neural Network

Since we will be using a Cycle GAN, which requires 2 Neural Networks competing with each other, we will create 2 Generators as its requirement. We will also test it by calling it and view the results created earlier.

#### Generator A

In [16]:
# Load Generators
G_A = Generator(3, parameters['ngf'], 3, parameters['num_resnet']).cuda() # input_dim, num_filter, output_dim, num_resnet
G_A.load_state_dict(torch.load('Generator_A.pth'))
G_A.eval()

Generator(
  (pad): ReflectionPad2d((3, 3, 3, 3))
  (conv1): ConvBlock(
    (conv): Conv2d(3, 32, kernel_size=(7, 7), stride=(1, 1))
    (bn): InstanceNorm2d(32, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (relu): ReLU(inplace=True)
    (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
    (tanh): Tanh()
  )
  (conv2): ConvBlock(
    (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (relu): ReLU(inplace=True)
    (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
    (tanh): Tanh()
  )
  (conv3): ConvBlock(
    (conv): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn): InstanceNorm2d(128, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (relu): ReLU(inplace=True)
    (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
    (tanh): Tanh()
  )
  (resnet_blocks): Sequential(
    (0): ResnetB

#### Generator B

Repeating the previous step, but for Generator B.

In [18]:
G_B = Generator(3, parameters['ngf'], 3, parameters['num_resnet']).cuda()
G_B.load_state_dict(torch.load('Generator_B.pth'))
G_B.eval()

Generator(
  (pad): ReflectionPad2d((3, 3, 3, 3))
  (conv1): ConvBlock(
    (conv): Conv2d(3, 32, kernel_size=(7, 7), stride=(1, 1))
    (bn): InstanceNorm2d(32, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (relu): ReLU(inplace=True)
    (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
    (tanh): Tanh()
  )
  (conv2): ConvBlock(
    (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (relu): ReLU(inplace=True)
    (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
    (tanh): Tanh()
  )
  (conv3): ConvBlock(
    (conv): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn): InstanceNorm2d(128, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (relu): ReLU(inplace=True)
    (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
    (tanh): Tanh()
  )
  (resnet_blocks): Sequential(
    (0): ResnetB

## Creating the Discriminator

The Discriminator will consist of the following
1. Initialization
2. Forward Pass
3. Weights

After creating the class, we will create the actuall Discriminator.


In [19]:
class Discriminator(torch.nn.Module):
    def __init__(self,input_dim,num_filter,output_dim):
        super(Discriminator,self).__init__()
        conv1 = ConvBlock(input_dim,num_filter,kernel_size=4,stride=2,padding=1,activation='lrelu',batch_norm=False)
        conv2 = ConvBlock(num_filter,num_filter*2,kernel_size=4,stride=2,padding=1,activation='lrelu')
        conv3 = ConvBlock(num_filter*2,num_filter*4,kernel_size=4,stride=2,padding=1,activation='lrelu')
        conv4 = ConvBlock(num_filter*4,num_filter*8,kernel_size=4,stride=1,padding=1,activation='lrelu')
        conv5 = ConvBlock(num_filter*8,output_dim,kernel_size=4,stride=1,padding=1,activation='no_act',batch_norm=False)
        self.conv_blocks = torch.nn.Sequential(conv1, conv2, conv3, conv4, conv5)
    
    def forward(self,x):
        out = self.conv_blocks(x)
        return out

    def normal_weight_init(self,mean=0.0,std=0.02):
        for m in self.children():
            if isinstance(m,ConvBlock):
                torch.nn.init.normal_(m.conv.weight.data,mean,std)

### Creating the Discriminator and Testing the Neural Network

Similar with the Generator, which requires 2 Neural Networks. A cycleGAN also uses 2 Discriminators.

#### Discriminator A

In [20]:
# Load Discriminators
D_A = Discriminator(3, parameters['ndf'], 1).cuda() # input_dim, num_filter, output_dim
D_A.load_state_dict(torch.load('Discriminator_A.pth'))
D_A.eval()

Discriminator(
  (conv_blocks): Sequential(
    (0): ConvBlock(
      (conv): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (bn): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
      (relu): ReLU(inplace=True)
      (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
      (tanh): Tanh()
    )
    (1): ConvBlock(
      (conv): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (bn): InstanceNorm2d(128, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
      (relu): ReLU(inplace=True)
      (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
      (tanh): Tanh()
    )
    (2): ConvBlock(
      (conv): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (bn): InstanceNorm2d(256, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
      (relu): ReLU(inplace=True)
      (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
      (tanh): Tanh()
    )
    

#### Discriminator B

In [21]:
D_B = Discriminator(3, parameters['ndf'], 1).cuda()
D_B.load_state_dict(torch.load('Discriminator_B.pth'))
D_B.eval()

Discriminator(
  (conv_blocks): Sequential(
    (0): ConvBlock(
      (conv): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (bn): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
      (relu): ReLU(inplace=True)
      (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
      (tanh): Tanh()
    )
    (1): ConvBlock(
      (conv): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (bn): InstanceNorm2d(128, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
      (relu): ReLU(inplace=True)
      (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
      (tanh): Tanh()
    )
    (2): ConvBlock(
      (conv): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (bn): InstanceNorm2d(256, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
      (relu): ReLU(inplace=True)
      (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
      (tanh): Tanh()
    )
    

## Training Block / DataLoaders

In this block, we will create the actual training block we will be using to train our model.

In [25]:
train_data_A = DatasetFromFolder(train_dir, subfolder='good', transform=transform,resize_scale=parameters['resize_scale'], crop_size=parameters['crop_size'], fliplr=parameters['fliplr'])
train_data_loader_A = torch.utils.data.DataLoader(dataset=train_data_A,batch_size=parameters['batch_size'], shuffle=True)
train_data_B = DatasetFromFolder(train_dir, subfolder='bad', transform=transform, resize_scale=parameters['resize_scale'], crop_size=parameters['crop_size'], fliplr=parameters['fliplr'])
train_data_loader_B = torch.utils.data.DataLoader(dataset=train_data_B, batch_size=parameters['batch_size'], shuffle=True)

#Load test data
test_data_A = DatasetFromFolder(test_dir, subfolder='good', transform=transform)
test_data_loader_A = torch.utils.data.DataLoader(dataset=test_data_A, batch_size=parameters['batch_size'], shuffle=False)
test_data_B = DatasetFromFolder(test_dir, subfolder='bad', transform=transform)
test_data_loader_B = torch.utils.data.DataLoader(dataset=test_data_B, batch_size=parameters['batch_size'], shuffle=False)

## Creating the Optimizer

In this block, we need to create the learning optimizers. We will have one optimizer for the Generator and two optimizers for the Discriminator. The Discriminator has two optimizers as this function checks the data generated by the Generator.

In [26]:
G_optimizer = torch.optim.Adam(itertools.chain(G_A.parameters(), G_B.parameters()), lr=parameters['lrG'], betas=(parameters['beta1'], parameters['beta2']))
G_optimizer.load_state_dict(torch.load('Generator_optimizer.pth'))

D_A_optimizer = torch.optim.Adam(D_A.parameters(), lr=parameters['lrD'], betas=(parameters['beta1'], parameters['beta2']))
D_A_optimizer.load_state_dict(torch.load('Discriminator_A_optimizer.pth'))

D_B_optimizer = torch.optim.Adam(D_B.parameters(), lr=parameters['lrD'], betas=(parameters['beta1'], parameters['beta2']))
D_B_optimizer.load_state_dict(torch.load('Discriminator_B_optimizer.pth'))

## Defining Losses

In order to make sure that there is learning, we will create cycle losses, which we will pass on at a later time.

In [27]:
MSE_Loss = torch.nn.MSELoss().cuda()
L1_Loss = torch.nn.L1Loss().cuda()

In [28]:
good_save_path = 'good_generated_images'
bad_save_path = 'bad_generated_images'
os.makedirs(good_save_path, exist_ok=True)
os.makedirs(bad_save_path, exist_ok=True)

In [35]:
test_real_A_data = train_data_A.__getitem__(8).unsqueeze(0)
test_real_B_data = train_data_B.__getitem__(6).unsqueeze(0)
i = 0
for epoch in range(200):
    # Show result for test image
    test_real_A = test_real_A_data.cuda()
    test_fake_B = G_A(test_real_A)
    test_recon_A = G_B(test_fake_B)

    test_real_B = test_real_B_data.cuda()
    test_fake_A = G_B(test_real_B)
    test_recon_B = G_A(test_fake_A)
    (bad_one, good_one), (good_two, bad_two) = [test_fake_B, test_fake_A], [test_recon_A, test_recon_B]
    
    save_image(good_one, good_save_path, i)
    i +=1
    save_image(good_two, good_save_path, i)
    i +=1
    save_image(bad_one, bad_save_path, i)
    i +=1
    save_image(bad_two, bad_save_path, i)
    i +=1

In [None]:
!zip -r cable-generated-images.zip good_generated_images/* bad_generated_images/*

In [None]:
!cp -r /kaggle/working/good_generated_images/* /kaggle/working/cable/train/good

In [None]:
!cp -r /kaggle/working/good_generated_images/* /kaggle/working/cable/test/good

In [None]:
!cp -r /kaggle/working/bad_generated_images/* /kaggle/working/cable/train/bad

In [None]:
!cp -r /kaggle/working/bad_generated_images/* /kaggle/working/cable/test/bad

In [43]:
data_dir = 'cable'
train_dir = os.path.join(data_dir,'train')
test_dir = os.path.join(data_dir,'test')

In [44]:
class ImageDataset(data.Dataset):
    def __init__(self, data_dir, transform=None):
        super(ImageDataset, self).__init__()
        self.data_dir = data_dir
        self.transform = transform
        self.data = []
        self.targets = []

        # Iterate over the data directory and load images
        for root, dirs, files in os.walk(data_dir):
            for file in files:
                image_path = os.path.join(root, file)
                image = Image.open(image_path)
                self.data.append(image)

                # Determine the class label based on the directory name
                class_label = os.path.basename(root)
                if class_label == "good":
                    self.targets.append(1)  # Assign class 1 for "good"
                elif class_label == "bad":
                    self.targets.append(0)  # Assign class 0 for "bad"

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        image = self.data[index]
        target = self.targets[index]

        if self.transform:
            image = self.transform(image)

        return image, target

In [45]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(32 * 16 * 16, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

In [52]:
transform = transforms.Compose([
    transforms.RandomRotation(50),  # Rotate randomly between -50 and 50 degrees
    transforms.RandomResizedCrop(256, scale=(0.8, 1.0)),  # Random scale and crop to 256x256
    transforms.RandomHorizontalFlip(),  # Flip horizontally with a 50% chance
    transforms.ToTensor(),
])

# Create data loaders
train_dataset = ImageDataset(train_dir, transform=transform)
test_dataset = ImageDataset(test_dir, transform=transform)

# Define data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Set device for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate the CNN model
model = CNN().to(device)


# Define loss function and optimizer
criterion = nn.BCELoss()
optimizer = Adam(model.parameters(), lr=0.002)

In [53]:
# Training loop
num_epochs = 500
model.train()

for epoch in range(num_epochs):
    for images, targets in train_loader:
        # Move data to device
        images = images.to(device)
        targets = targets.unsqueeze(1).to(device)
        
        # Convert targets to float
        targets = targets.float()

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, targets)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print training loss
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

Epoch [1/500], Loss: 0.6564
Epoch [1/500], Loss: 0.0916
Epoch [1/500], Loss: 7.2464
Epoch [1/500], Loss: 0.0011
Epoch [1/500], Loss: 0.3414
Epoch [1/500], Loss: 0.2940
Epoch [1/500], Loss: 0.2985
Epoch [1/500], Loss: 0.2617
Epoch [2/500], Loss: 0.2017
Epoch [2/500], Loss: 0.2402
Epoch [2/500], Loss: 0.4276
Epoch [2/500], Loss: 0.0111
Epoch [2/500], Loss: 0.1430
Epoch [2/500], Loss: 0.2937
Epoch [2/500], Loss: 0.1484
Epoch [2/500], Loss: 0.0401
Epoch [3/500], Loss: 0.0524
Epoch [3/500], Loss: 0.2312
Epoch [3/500], Loss: 0.4185
Epoch [3/500], Loss: 0.1493
Epoch [3/500], Loss: 0.3166
Epoch [3/500], Loss: 0.0948
Epoch [3/500], Loss: 0.0860
Epoch [3/500], Loss: 0.0567
Epoch [4/500], Loss: 0.3559
Epoch [4/500], Loss: 0.0217
Epoch [4/500], Loss: 0.1435
Epoch [4/500], Loss: 0.0108
Epoch [4/500], Loss: 0.4646
Epoch [4/500], Loss: 0.1483
Epoch [4/500], Loss: 0.1377
Epoch [4/500], Loss: 0.3500
Epoch [5/500], Loss: 0.3144
Epoch [5/500], Loss: 0.1136
Epoch [5/500], Loss: 0.1316
Epoch [5/500], Loss:

In [56]:
# Evaluation
model.eval()
with torch.no_grad():
    all_predictions = []
    all_targets = []
    for images, targets in test_loader:
        
        # Move data to device
        images = images.to(device)
        targets = targets.unsqueeze(1).to(device)

        # Forward pass
        outputs = model(images)
        predictions = torch.round(outputs)

        # Collect predictions and targets
        all_predictions.extend(predictions.cpu().numpy().flatten().tolist())
        all_targets.extend(targets.cpu().numpy().flatten().tolist())

    # Calculate metrics
    correct = sum(all_predictions[i] == all_targets[i] for i in range(len(all_predictions)))
    accuracy = correct / len(all_predictions)
    tp = sum(all_predictions[i] == 1 and all_targets[i] == 1 for i in range(len(all_predictions)))
    tn = sum(all_predictions[i] == 0 and all_targets[i] == 0 for i in range(len(all_predictions)))
    fp = sum(all_predictions[i] == 1 and all_targets[i] == 0 for i in range(len(all_predictions)))
    fn = sum(all_predictions[i] == 0 and all_targets[i] == 1 for i in range(len(all_predictions)))
    sensitivity = tp / (tp + fn)
    specificity = tn / (tn + fp)
    auc = roc_auc_score(all_targets, all_predictions)
    p_value = ttest_ind(all_predictions, all_targets).pvalue

In [57]:
# Print the evaluation results
print("Accuracy:", accuracy)
print("Sensitivity:", sensitivity)
print("Specificity:", specificity)
print("AUC:", auc)
print("p-value:", p_value)

Accuracy: 0.6214285714285714
Sensitivity: 1.0
Specificity: 0.35365853658536583
AUC: 0.6768292682926829
p-value: 1.9545323079916612e-11
