In [1]:
!pip install wandb



In [2]:
!pip install torchsummary

Collecting torchsummary
  Downloading torchsummary-1.5.1-py3-none-any.whl.metadata (296 bytes)
Downloading torchsummary-1.5.1-py3-none-any.whl (2.8 kB)
Installing collected packages: torchsummary
Successfully installed torchsummary-1.5.1


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os
import torch.nn.functional as F
import wandb
from torchsummary import summary

In [4]:
class Encoder(nn.Module):
    def __init__(self, in_channels=3, out_channels=16, latent_dim=200):
        super().__init__()
        self.out_channels = out_channels

        self.net = nn.Sequential( #  597x449
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),  # (600, 450)
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),  
            nn.ReLU(),
            nn.Conv2d(out_channels, 2 * out_channels, kernel_size=3, stride=2, padding=1),  # (298, 225)
            nn.ReLU(),
            nn.Conv2d(2 * out_channels, 2 * out_channels, kernel_size=3, padding=1),  
            nn.ReLU(),
            nn.Conv2d(2 * out_channels, 4 * out_channels, kernel_size=3, stride=2, padding=1),  # (150, 113)
            nn.ReLU(),
            nn.Conv2d(4 * out_channels, 4 * out_channels, kernel_size=3, padding=1),  
            nn.ReLU(),
        )
        
        self.flatten_size = 4 * out_channels * 150 * 113

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self.flatten_size, latent_dim),
            nn.ReLU(),
        )

    def forward(self, x):
        x = x.view(-1, 3, 597, 449)
        x = self.net(x)
        x = self.fc(x)
        return x


class Decoder(nn.Module):
    def __init__(self, in_channels=3, out_channels=16, latent_dim=200):
        super().__init__()
        self.out_channels = out_channels

        self.fc = nn.Sequential(
            nn.Linear(latent_dim, 4 * out_channels * 150 * 113),
            nn.ReLU(),
        )

        self.conv = nn.Sequential(
            nn.ConvTranspose2d(4 * out_channels, 4 * out_channels, kernel_size=3, padding=1),  # (150, 113)
            nn.ReLU(),
            nn.ConvTranspose2d(4 * out_channels, 2 * out_channels, kernel_size=3, stride=2, padding=1, output_padding=0),  # (300, 225)
            nn.ReLU(),
            nn.ConvTranspose2d(2 * out_channels, 2 * out_channels, kernel_size=3, padding=1),  
            nn.ReLU(),
            nn.ConvTranspose2d(2 * out_channels, out_channels, kernel_size=3, stride=2, padding=1, output_padding=0),  # (600, 450)
            nn.ReLU(),
            nn.ConvTranspose2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(out_channels, in_channels, kernel_size=3, padding=1),  
            nn.Sigmoid(),  # Normalization output [0, 1]
        )

    def forward(self, x):
        x = self.fc(x)
        x = x.view(-1, 4 * self.out_channels, 150, 113)  # Відновлюємо форму для згорткових шарів
        x = self.conv(x)
        return x


class Autoencoder(nn.Module):
    def __init__(self, encoder, decoder, device="cuda"):
        super().__init__()
        self.encoder = encoder
        self.encoder.to(device)
        self.decoder = decoder
        self.decoder.to(device)

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded


In [5]:
# Function to calculate the correlation coefficient
def correlation_coefficient(x, y):
    x_mean = np.mean(x)
    y_mean = np.mean(y)
    
    numerator = np.sum((x - x_mean) * (y - y_mean))
    denominator = np.sqrt(np.sum((x - x_mean)**2) * np.sum((y - y_mean)**2))
    
    return numerator / denominator

# Function to calculate the threshold
def calculate_threshold(correlation_coeffs):
    mu_c = np.mean(correlation_coeffs)
    sigma_c = np.std(correlation_coeffs)
    threshold = mu_c - 0.5 * sigma_c
    return threshold

In [6]:
!wandb login 492953ddcda0576b6e6ebf89860aed0ccd177efe

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


In [40]:
run_name = f"Autoencoder all examples out_channels=8, latent_dim=32 batch_size=8"

In [41]:
wandb.init(
    # set the wandb project where this run will be logged
    project="Skin cancer BASE model",

    # track hyperparameters and run metadata
    config={
    "epochs" : 10,
    "batch_size" : 8,
    "learning_rate" : 0.002,
    },
    name=run_name
)

0,1
epoch,▁▂▃▃▄▅▆▆▇█
loss,█▄▂▁▂▃▃▄▁▅
psnr,█▆▃▄▅▁▇▂▃▂

0,1
epoch,10.0
loss,0.56348
psnr,-0.06377


In [42]:
# Class for loading images from a folder
class CustomDataset(Dataset):
    def __init__(self, folder_path, transform=None, max_images=None):
        self.image_paths = [os.path.join(folder_path, fname) for fname in os.listdir(folder_path) if fname.endswith('.jpg') or fname.endswith('.png')]
        if max_images is not None:
            self.image_paths = self.image_paths[:max_images]  # Limit the number of images to max_images
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_name = self.image_paths[idx]
        image = Image.open(img_name).convert('RGB')
        image = np.array(image)
        if self.transform:
            # image = self.transform(image)
            image = self.transform(image=image)['image']
        return image

In [27]:
import torch
import torch.nn.functional as F
import numpy as np
import wandb
from skimage.metrics import structural_similarity as ssim
import math

def psnr(target, prediction):
    """
    Calculate the PSNR between the target and prediction.
    """
    mse = torch.mean((target - prediction) ** 2)
    if mse == 0:
        return float('inf')  # No error, images are identical
    max_pixel = 1.0  # Assuming pixel values are normalized between [0, 1]
    psnr_value = 20 * torch.log10(max_pixel / torch.sqrt(mse))
    return psnr_value

from tqdm import tqdm

def train_autoencoder(model, dataloader, criterion, optimizer, scheduler, device, epochs):
    """
    Train the autoencoder with batch support.
    """
    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        total_psnr = 0.0

        # Use tqdm for progress tracking
        with tqdm(dataloader, desc=f"Epoch {epoch + 1}/{epochs}", unit="batch") as pbar:
            for batch_idx, data in enumerate(pbar):
                # Move data to the specified device (GPU/CPU)
                data = data.to(device)
                # print(data.min(), data.max())  # Should be 0 and 1


                # Zero the gradients
                optimizer.zero_grad()

                # Perform the forward pass (prediction)
                output = model(data)
                
                # Calculate the loss
                loss = criterion(output, data)
                loss.backward()
                optimizer.step()

                # Aggregate metrics
                train_loss += loss.item()
                psnr_value = psnr(data, output)
                total_psnr += psnr_value.item()

                # Update tqdm progress bar with current loss and PSNR
                pbar.set_postfix(loss=loss.item(), psnr=psnr_value.item())

        # Calculate average loss and PSNR for the epoch
        average_loss = train_loss / len(dataloader)
        average_psnr = total_psnr / len(dataloader)

        # Log metrics to the console
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {average_loss:.4f}, PSNR: {average_psnr:.2f}")
        wandb.log({"epoch": epoch + 1, "loss": average_loss, "psnr": average_psnr})

        # Update the learning rate scheduler
        scheduler.step(average_loss)

    return model




# def train_autoencoder(model, dataloader, criterion, optimizer, scheduler, device, epochs):
#     """
#     Function to train the autoencoder.
    
#     Args:
#         model: The autoencoder model.
#         dataloader: The data loader.
#         criterion: The loss function.
#         optimizer: The optimizer.
#         scheduler: The learning rate scheduler.
#         device: The device (CPU or GPU).
#         epochs: The number of epochs.

#     Returns:
#         The model after training.
#     """
#     for epoch in range(epochs):
#         model.train()
#         train_loss = 0.0
#         total_psnr = 0.0
#         total_ssim = 0.0
        
#         for data in dataloader:
#             data = data.to(device)
            
            
#             optimizer.zero_grad()
#             output = model(data)
            
#             # Resizing to avoid shape mismatch error
#             data_resized = F.interpolate(data, size=(597, 449), mode='bicubic', align_corners=False)
            
#             loss = criterion(output, data_resized)
#             loss.backward()
#             print(loss)
#             optimizer.step()

#             train_loss += loss.item()

#             # Calculate PSNR and SSIM
#             psnr_value = psnr(data_resized, output)
#             total_psnr += psnr_value.item()

#             # ssim_value = ssim(
#             # data_resized.cpu().numpy().transpose(0, 2, 3, 1), 
#             # output.cpu().detach().numpy().transpose(0, 2, 3, 1), 
#             # win_size=7,  # Explicitly set the window size
#             # channel_axis=-1  # Indicate that the last dimension is the channel axis
#             # )
#             # total_ssim += np.mean(ssim_value)

#         # Average loss, PSNR, and SSIM for the epoch
#         average_loss = train_loss / len(dataloader)
#         average_psnr = total_psnr / len(dataloader)
#         # average_ssim = total_ssim / len(dataloader)
        
#         # # Output results
#         # print(f"Epoch [{epoch+1}/{epochs}], Loss: {average_loss}, PSNR: {average_psnr}, SSIM: {average_ssim}")
        
#         # # Logging the metrics to wandb
#         # wandb.log({"epoch": epoch + 1, "loss": average_loss, "psnr": average_psnr, "ssim": average_ssim})

#         # Output results
#         print(f"Epoch [{epoch+1}/{epochs}], Loss: {average_loss}, PSNR: {average_psnr}")
        
#         # Logging the metrics to wandb
#         wandb.log({"epoch": epoch + 1, "loss": average_loss, "psnr": average_psnr})
        
        
#         # Update learning rate
#         scheduler.step(average_loss)

#     return model

In [28]:
encoder = Encoder(in_channels=3, out_channels=8, latent_dim=16)
decoder = Decoder(in_channels=3, out_channels=8, latent_dim=16)
autoencoder_model = Autoencoder(encoder, decoder)

In [29]:
import torch
print(torch.cuda.is_available())  # Should return True if GPU is available
print(torch.cuda.current_device())  # Should return the index of the current GPU


True
0


In [30]:
summary(autoencoder_model, (3, 597, 449))  # 597x449

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 8, 597, 449]             224
              ReLU-2          [-1, 8, 597, 449]               0
            Conv2d-3          [-1, 8, 597, 449]             584
              ReLU-4          [-1, 8, 597, 449]               0
            Conv2d-5         [-1, 16, 299, 225]           1,168
              ReLU-6         [-1, 16, 299, 225]               0
            Conv2d-7         [-1, 16, 299, 225]           2,320
              ReLU-8         [-1, 16, 299, 225]               0
            Conv2d-9         [-1, 32, 150, 113]           4,640
             ReLU-10         [-1, 32, 150, 113]               0
           Conv2d-11         [-1, 32, 150, 113]           9,248
             ReLU-12         [-1, 32, 150, 113]               0
          Flatten-13               [-1, 542400]               0
           Linear-14                   

In [43]:
# Data preparation
image_folder = "/kaggle/input/ham1000-segmentation-and-classification/images"  # Path to the folder containing images
import albumentations as A
import cv2
from albumentations.pytorch import ToTensorV2

transform = A.Compose([
    A.Resize(height=597, width=449),
    A.Blur(blur_limit=3, p=0.5),  
    A.ShiftScaleRotate(shift_limit=0.1, rotate_limit=15, p=0.5, border_mode=cv2.BORDER_CONSTANT), 
    A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.05, p=0.5),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])


dataset = CustomDataset(image_folder, transform)
dataloader = DataLoader(dataset, batch_size=wandb.config.batch_size, shuffle=True)

# # Creating and training the autoencoder
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder = autoencoder_model.to(device)
optimizer = optim.Adam(autoencoder.parameters(), lr=wandb.config.learning_rate)
criterion = nn.BCEWithLogitsLoss() 
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.5)

# Calling the training function
autoencoder = train_autoencoder(
    model=autoencoder,
    dataloader=dataloader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    device=device,
    epochs=wandb.config.epochs
)

# Saving the model
torch.save(autoencoder.state_dict(), "cnn_autoencoder.pth")
wandb.save("cnn_autoencoder.pth")

Epoch 1/10: 100%|██████████| 1252/1252 [04:19<00:00,  4.82batch/s, loss=0.534, psnr=-0.35]   


Epoch [1/10], Loss: 0.5618, PSNR: -0.01


Epoch 2/10: 100%|██████████| 1252/1252 [04:19<00:00,  4.83batch/s, loss=0.642, psnr=-0.0791]


Epoch [2/10], Loss: 0.5613, PSNR: -0.01


Epoch 3/10: 100%|██████████| 1252/1252 [04:19<00:00,  4.82batch/s, loss=0.537, psnr=-0.563]   


Epoch [3/10], Loss: 0.5601, PSNR: 0.01


Epoch 4/10: 100%|██████████| 1252/1252 [04:19<00:00,  4.82batch/s, loss=0.632, psnr=-0.91]   


Epoch [4/10], Loss: 0.5605, PSNR: 0.01


Epoch 5/10: 100%|██████████| 1252/1252 [04:19<00:00,  4.83batch/s, loss=0.553, psnr=-1.32]   


Epoch [5/10], Loss: 0.5619, PSNR: -0.02


Epoch 6/10: 100%|██████████| 1252/1252 [04:20<00:00,  4.81batch/s, loss=0.511, psnr=-0.56]   


Epoch [6/10], Loss: 0.5622, PSNR: -0.03


Epoch 7/10: 100%|██████████| 1252/1252 [04:18<00:00,  4.84batch/s, loss=0.558, psnr=1.55]    


Epoch [7/10], Loss: 0.5598, PSNR: 0.01


Epoch 8/10: 100%|██████████| 1252/1252 [04:19<00:00,  4.83batch/s, loss=0.566, psnr=-0.604]  


Epoch [8/10], Loss: 0.5624, PSNR: -0.00


Epoch 9/10: 100%|██████████| 1252/1252 [04:19<00:00,  4.83batch/s, loss=0.55, psnr=0.284]    


Epoch [9/10], Loss: 0.5603, PSNR: 0.01


Epoch 10/10: 100%|██████████| 1252/1252 [04:18<00:00,  4.84batch/s, loss=0.522, psnr=2.02]   


Epoch [10/10], Loss: 0.5622, PSNR: -0.02


['/kaggle/working/wandb/run-20250122_111009-flz2owgx/files/cnn_autoencoder.pth']

In [None]:
# # Отримання вихідного зображення після реконструкції
# test_image = Image.open("/kaggle/input/dddddsd/photo_5_2025-01-05_14-16-42.jpg").convert('RGB')
# test_image = transform(test_image).unsqueeze(0).to(device)
# output_image = autoencoder(test_image).detach().cpu().numpy()

# # Обчислення коефіцієнту кореляції
# test_image_resized = transforms.functional.resize(test_image, size=(600, 456), interpolation=transforms.InterpolationMode.BICUBIC)
# input_pixels = test_image_resized.squeeze(0).cpu().numpy().flatten()

# reconstructed_pixels = output_image.flatten()
# corr_value = correlation_coefficient(input_pixels, reconstructed_pixels)
# print(f"Correlation Coefficient: {corr_value}")

# # Масив кореляцій для класу (приклад)
# correlation_coeffs = np.random.rand(100)  # Масив кореляцій для класу

# # Обчислення порогу для цього класу
# threshold = calculate_threshold(correlation_coeffs)
# print(f"Threshold: {threshold}")

# # Перевірка чи зображення належить тому ж класу
# if corr_value > threshold:
#     print("Зображення належить тому ж класу.")
# else:
#     print("Зображення не належить тому ж класу.")