In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torchvision.utils import save_image
from tqdm import tqdm
import itertools

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AttentionGate(nn.Module):
    def __init__(self, F_g, F_l, F_int):
        super(AttentionGate, self).__init__()
        self.W_g = nn.Conv2d(F_g, F_int, kernel_size=1, stride=1, padding=0, bias=True)
        self.W_x = nn.Conv2d(F_l, F_int, kernel_size=1, stride=1, padding=0, bias=True)
        
        self.psi = nn.Sequential(
            nn.Conv2d(F_int, 1, kernel_size=1, stride=1, padding=0, bias=True),
            nn.Sigmoid()
        )
        
        self.relu = nn.ReLU(inplace=True)

    def forward(self, g, x):
        g1 = self.W_g(g)
        x1 = self.W_x(x)
        psi = self.relu(g1 + x1)
        psi = self.psi(psi)
        return x * psi

In [3]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        # Downsample
        self.downsample = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
        )
        # Upsample
        self.upsample = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.ConvTranspose2d(32, 3, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )
        
        # Attention Gates
        self.att1 = AttentionGate(F_g=128, F_l=128, F_int=64)
        self.att2 = AttentionGate(F_g=64, F_l=64, F_int=32)
        self.att3 = AttentionGate(F_g=32, F_l=32, F_int=16)

    def forward(self, x):
        # Encoder
        d1 = self.downsample[0:2](x) # 32
        d2 = self.downsample[2:4](d1) # 64
        d3 = self.downsample[4:6](d2) # 128
        d4 = self.downsample[6:8](d3) # 256
        
        # Decoder with attention
        u3 = self.upsample[0:2](d4) 
        u3 = self.att1(g=u3, x=d3)
        u2 = self.upsample[2:4](u3) 
        u2 = self.att2(g=u2, x=d2)
        u1 = self.upsample[4:6](u2)
        out = self.upsample[6:8](u1)
        
        return out


In [4]:
import torch.nn as nn

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1, kernel_size=3, stride=1, padding=1),  # Output a single scalar
            nn.Sigmoid()  # Output in [0, 1] range (for binary classification)
        )

    def forward(self, x):
        return self.model(x)


In [5]:
!ls ../input/Dataset/images

ls: cannot access '../input/Dataset/images': No such file or directory


In [6]:
# Define transformations for data preprocessing
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images to a consistent size
    transforms.Grayscale(),          # Convert images to grayscale
    transforms.ToTensor(),           # Convert images to PyTorch tensors
    transforms.Normalize((0.5,), (0.5,))  # Normalize images to the range [-1, 1]
])




In [7]:

# Initialize hyperparameters
batch_size = 4
learning_rate = 0.0002
num_epochs = 3
lambda_cycle = 10  # Weight for cycle consistency loss



In [8]:
import os
import cv2
import numpy as np
from torch.utils.data import Dataset
from torchvision.transforms import ToTensor

class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = self._load_image_paths()

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = self._read_image(img_path)

        if self.transform:
            img = self.transform(img)

        return img

    def _load_image_paths(self):
        image_paths = []
        for root, _, files in os.walk(self.root_dir):
            for file in files:
                if file.endswith('.jpg') or file.endswith('.png'):
                    image_paths.append(os.path.join(root, file))
        return image_paths

    def _read_image(self, img_path):
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (256, 256))
        img = img.astype(np.float32) / 255.0  # Convert to float32 and normalize to range [0, 1]
        return img


data_dir = "/kaggle/input/ct-to-mri-cgan/Dataset/images"
transform = ToTensor()  # Convert images to PyTorch tensors

# Create custom datasets
train_dataset_A = CustomImageDataset(root_dir=os.path.join(data_dir, 'trainA'), transform=transform)
train_dataset_B = CustomImageDataset(root_dir=os.path.join(data_dir, 'trainB'), transform=transform)
test_dataset_A = CustomImageDataset(root_dir=os.path.join(data_dir, 'testA'), transform=transform)
test_dataset_B = CustomImageDataset(root_dir=os.path.join(data_dir, 'testB'), transform=transform)


In [9]:
!ls "/kaggle/input/ct-to-mri-cgan/Dataset/images"

testA  testB  trainA  trainB


In [10]:
print("Length of train_dataset_A:", len(train_dataset_A))
print("Length of train_dataset_B:", len(train_dataset_B))
print("Length of test_dataset_A:", len(test_dataset_A))
print("Length of test_dataset_B:", len(test_dataset_B))


Length of train_dataset_A: 1742
Length of train_dataset_B: 1744
Length of test_dataset_A: 744
Length of test_dataset_B: 744


In [11]:
# Define data loaders
train_loader_A = DataLoader(dataset=train_dataset_A, batch_size=batch_size, shuffle=True)
train_loader_B = DataLoader(dataset=train_dataset_B, batch_size=batch_size, shuffle=True)
test_loader_A = DataLoader(dataset=test_dataset_A, batch_size=batch_size, shuffle=False)
test_loader_B = DataLoader(dataset=test_dataset_B, batch_size=batch_size, shuffle=False)

In [12]:
# Initialize generator and discriminator
G_AB = Generator()
G_BA = Generator()
D_A = Discriminator()
D_B = Discriminator()

In [13]:
# Print Generator model structure
print("Generator Model:")
print(G_AB)

# Print Discriminator model structure
print("\nDiscriminator Model:")
print(D_A)

Generator Model:
Generator(
  (downsample): Sequential(
    (0): Conv2d(3, 32, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.2, inplace=True)
    (2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.2, inplace=True)
    (4): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (5): LeakyReLU(negative_slope=0.2, inplace=True)
    (6): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (7): LeakyReLU(negative_slope=0.2, inplace=True)
  )
  (upsample): Sequential(
    (0): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.2, inplace=True)
    (2): ConvTranspose2d(128, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.2, inplace=True)
    (4): ConvTranspose2d(64, 32, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (5): LeakyReLU(negative_slope=0.2, 

In [14]:
# Define loss functions
criterion_GAN = nn.MSELoss()
criterion_cycle = nn.L1Loss()

In [15]:
# Initialize optimizers
optimizer_G = optim.AdamW(itertools.chain(G_AB.parameters(), G_BA.parameters()), lr=learning_rate)
optimizer_D_A = optim.AdamW(D_A.parameters(), lr=learning_rate)
optimizer_D_B = optim.AdamW(D_B.parameters(), lr=learning_rate)

In [16]:
import os

# Define the directory to save images
output_dir = '/kaggle/working/output_images'
os.makedirs(output_dir, exist_ok=True)  # Create the directory if it doesn't exist



In [17]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move the generator and discriminator models to the selected device
G_AB.to(device)
G_BA.to(device)
D_A.to(device)
D_B.to(device)


Discriminator(
  (model): Sequential(
    (0): Conv2d(3, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.2, inplace=True)
    (2): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.2, inplace=True)
    (4): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): LeakyReLU(negative_slope=0.2, inplace=True)
    (6): Conv2d(512, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): Sigmoid()
  )
)

In [18]:
import os
import torch
import pickle
# Define paths for saving model weights and entire models
model_dir = '/kaggle/working/saved_models'
os.makedirs(model_dir, exist_ok=True)


# Define paths for saving model weights and entire models
model_dir = '/kaggle/working/saved_models'
os.makedirs(model_dir, exist_ok=True)


In [19]:
from tqdm.auto import tqdm

In [24]:
# Initialize the best loss with a high value
best_loss = float('inf')
import os

# Open a log file in write mode
with open(os.path.join(model_dir, '/kaggle/working/training_log.txt'), 'w') as log_file:
    for epoch in range(num_epochs):
        epoch_loss = 0  # Initialize loss accumulator for the epoch

        for i, (real_A, real_B) in enumerate(zip(train_loader_A, train_loader_B)):
            real_A = real_A.to(device)
            real_B = real_B.to(device)

            # -------------------
            #  Train Generators
            # -------------------
            fake_B = G_AB(real_A)
            rec_A = G_BA(fake_B)
            fake_A = G_BA(real_B)
            rec_B = G_AB(fake_A)

            # Generator losses
            loss_GAN_AB = criterion_GAN(D_B(fake_B), torch.ones_like(D_B(fake_B)))
            loss_GAN_BA = criterion_GAN(D_A(fake_A), torch.ones_like(D_A(fake_A)))
            loss_cycle_A = criterion_cycle(rec_A, real_A)
            loss_cycle_B = criterion_cycle(rec_B, real_B)
            
            # Total generator loss
            loss_G = loss_GAN_AB + loss_GAN_BA + lambda_cycle * (loss_cycle_A + loss_cycle_B)

            # Backward and optimize
            optimizer_G.zero_grad()
            loss_G.backward()
            optimizer_G.step()

            # ---------------------
            #  Train Discriminators
            # ---------------------
            loss_D_A_real = criterion_GAN(D_A(real_A), torch.ones_like(D_A(real_A)))
            loss_D_A_fake = criterion_GAN(D_A(fake_A.detach()), torch.zeros_like(D_A(fake_A)))
            loss_D_A = (loss_D_A_real + loss_D_A_fake) / 2

            loss_D_B_real = criterion_GAN(D_B(real_B), torch.ones_like(D_B(real_B)))
            loss_D_B_fake = criterion_GAN(D_B(fake_B.detach()), torch.zeros_like(D_B(fake_B)))
            loss_D_B = (loss_D_B_real + loss_D_B_fake) / 2

            optimizer_D_A.zero_grad()
            optimizer_D_B.zero_grad()
            loss_D_A.backward()
            loss_D_B.backward()
            optimizer_D_A.step()
            optimizer_D_B.step()

            # Accumulate losses for the epoch
            epoch_loss += loss_G.item() + loss_D_A.item() + loss_D_B.item()

        # Average the loss over the number of batches
        epoch_loss /= len(train_loader_A)

        # Check if this epoch's loss is the best; if yes, save the model
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            torch.save(G_AB.state_dict(), os.path.join(model_dir, 'best_generator_AB_weights.pth'))
            torch.save(G_BA.state_dict(), os.path.join(model_dir, 'best_generator_BA_weights.pth'))
            torch.save(D_A.state_dict(), os.path.join(model_dir, 'best_discriminator_A_weights.pth'))
            torch.save(D_B.state_dict(), os.path.join(model_dir, 'best_discriminator_B_weights.pth'))
            log_file.write(f'Epoch [{epoch+1}/{num_epochs}], New best average loss: {best_loss}, Models saved!\n')

        # Periodic logging and image saving (every 5 epochs)
        if epoch % 5 == 0:
            save_image(fake_B, os.path.join(output_dir, f'fake_B_{epoch}.png'))
            save_image(fake_A, os.path.join(output_dir, f'fake_A_{epoch}.png'))
            log_file.write(f"Epoch [{epoch+1}/{num_epochs}], Generator Loss: {loss_G.item()}, Discriminator A Loss: {loss_D_A.item()}, Discriminator B Loss: {loss_D_B.item()}, GAN Loss AB: {loss_GAN_AB.item()}, GAN Loss BA: {loss_GAN_BA.item()}, Cycle Loss A: {loss_cycle_A.item()}, Cycle Loss B: {loss_cycle_B.item()}\n")

    log_file.write("Training completed!\n")


KeyboardInterrupt: 

In [None]:
import os
import torch
import pickle

# Define paths for saving model weights and entire models
model_dir = 'saved_models'
os.makedirs(model_dir, exist_ok=True)


# Define file paths for saving the state dicts
generator_AB_weights_path = os.path.join(model_dir, 'generator_AB_weights.pth')
generator_BA_weights_path = os.path.join(model_dir, 'generator_BA_weights.pth')
discriminator_A_weights_path = os.path.join(model_dir, 'discriminator_A_weights.pth')
discriminator_B_weights_path = os.path.join(model_dir, 'discriminator_B_weights.pth')

# Define file paths for saving the entire models (for pickling)
generator_AB_model_path = os.path.join(model_dir, 'generator_AB_model.pkl')
generator_BA_model_path = os.path.join(model_dir, 'generator_BA_model.pkl')
discriminator_A_model_path = os.path.join(model_dir, 'discriminator_A_model.pkl')
discriminator_B_model_path = os.path.join(model_dir, 'discriminator_B_model.pkl')

# Training loop (omitted for brevity)

# Save the model weights
torch.save(G_AB.state_dict(), generator_AB_weights_path)
torch.save(G_BA.state_dict(), generator_BA_weights_path)
torch.save(D_A.state_dict(), discriminator_A_weights_path)
torch.save(D_B.state_dict(), discriminator_B_weights_path)

# Save the entire models using pickle
with open(generator_AB_model_path, 'wb') as f:
    pickle.dump(G_AB, f)

with open(generator_BA_model_path, 'wb') as f:
    pickle.dump(G_BA, f)

with open(discriminator_A_model_path, 'wb') as f:
    pickle.dump(D_A, f)

with open(discriminator_B_model_path, 'wb') as f:
    pickle.dump(D_B, f)

print(f"Models and weights saved to {model_dir}:")
print(f"Generator AB weights -> {generator_AB_weights_path}")
print(f"Generator BA weights -> {generator_BA_weights_path}")
print(f"Discriminator A weights -> {discriminator_A_weights_path}")
print(f"Discriminator B weights -> {discriminator_B_weights_path}")
print(f"Generator AB model -> {generator_AB_model_path}")
print(f"Generator BA model -> {generator_BA_model_path}")
print(f"Discriminator A model -> {discriminator_A_model_path}")
print(f"Discriminator B model -> {discriminator_B_model_path}")


In [None]:
import matplotlib.pyplot as plt

# Set generator models to evaluation mode
G_AB.eval()
G_BA.eval()

# Test loop
with torch.no_grad():
    for i, (real_A, real_B) in enumerate(zip(test_loader_A, test_loader_B)):
        # Move real_A and real_B to the device
        real_A = real_A.to(device)
        real_B = real_B.to(device)

        # Generate fake images
        fake_B = G_AB(real_A[0].unsqueeze(0))  # Add batch dimension
        fake_A = G_BA(real_B[0].unsqueeze(0))  # Add batch dimension

        # Save and print generated images
        save_image(fake_B, os.path.join(output_dir, f'test_fake_B_{i}.png'))
        save_image(fake_A, os.path.join(output_dir, f'test_fake_A_{i}.png'))
        
        print(f"Test Image {i}:")
        print("Real A:")
        plt.imshow(real_A[0].permute(1, 2, 0).cpu().numpy())
        plt.show()
        print("Real B:")
        plt.imshow(real_B[0].permute(1, 2, 0).cpu().numpy())
        plt.show()
        print("Generated B from A:")
        plt.imshow(fake_B[0].permute(1, 2, 0).cpu().numpy())
        plt.show()
        print("Generated A from B:")
        plt.imshow(fake_A[0].permute(1, 2, 0).cpu().numpy())
        plt.show()

        if i == 24:  # Stop after processing 25 images
            break

In [None]:
import os
# Initialize models with suffixes
G_AB_new = Generator()
G_BA_new = Generator()
D_A_new = Discriminator()
D_B_new = Discriminator()

# Path to your saved model weights
model_dir = '/kaggle/input/cyclegan100e/saved_models/'

# Load the weights into the new model instances
G_AB_new.load_state_dict(torch.load(os.path.join(model_dir, 'best_generator_AB_weights.pth')))
G_BA_new.load_state_dict(torch.load(os.path.join(model_dir, 'best_generator_BA_weights.pth')))
D_A_new.load_state_dict(torch.load(os.path.join(model_dir, 'best_discriminator_A_weights.pth')))
D_B_new.load_state_dict(torch.load(os.path.join(model_dir, 'best_discriminator_B_weights.pth')))

# Move models to GPU if CUDA is available
if torch.cuda.is_available():
    G_AB_new.cuda()
    G_BA_new.cuda()
    D_A_new.cuda()
    D_B_new.cuda()

print("Models with new suffixes loaded successfully and ready for use!")


In [None]:
import matplotlib.pyplot as plt 
def perform_inference(G_AB, G_BA, test_loader_A, test_loader_B, device, output_dir):
    """
    Perform inference using provided generator models on test data.
    Args:
    - G_AB: Generator model from domain A to B.
    - G_BA: Generator model from domain B to A.
    - test_loader_A: DataLoader for domain A test images.
    - test_loader_B: DataLoader for domain B test images.
    - device: Device to run the inference on (e.g., 'cuda' or 'cpu').
    - output_dir: Directory to save generated images.
    """
    # Set generator models to evaluation mode
    G_AB.eval()
    G_BA.eval()

    # Test loop with no gradients needed for inference
    with torch.no_grad():
        for i, (real_A, real_B) in enumerate(zip(test_loader_A, test_loader_B)):
            # Move real_A and real_B to the device
            real_A = real_A.to(device)
            real_B = real_B.to(device)

            # Generate fake images
            fake_B = G_AB(real_A[0].unsqueeze(0))  # Add batch dimension
            fake_A = G_BA(real_B[0].unsqueeze(0))  # Add batch dimension

            # Save and print generated images
            save_image(fake_B, os.path.join(output_dir, f'test_fake_B_{i}.png'))
            save_image(fake_A, os.path.join(output_dir, f'test_fake_A_{i}.png'))
            
            print(f"Test Image {i}:")
            print("Real A:")
            plt.imshow(real_A[0].permute(1, 2, 0).cpu().numpy())
            plt.show()
            print("Real B:")
            plt.imshow(real_B[0].permute(1, 2, 0).cpu().numpy())
            plt.show()
            print("Generated B from A:")
            plt.imshow(fake_B[0].permute(1, 2, 0).cpu().numpy())
            plt.show()
            print("Generated A from B:")
            plt.imshow(fake_A[0].permute(1, 2, 0).cpu().numpy())
            plt.show()

            if i == 24:  # Stop after processing 25 images
                break

# Example of how to call the function
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
image_dir = '/kaggle/working/best_output_images'
os.makedirs(image_dir, exist_ok=True)
perform_inference(G_AB_new, G_BA_new, test_loader_A, test_loader_B, device, image_dir)