In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [46]:
!ls ./drive/MyDrive/CS5242_project/personal_testing/data

CAM16_100cls_10mask  pRCC_nolabel  WBC_1  WBC_10  WBC_100  WBC_50


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from tqdm import tqdm
from PIL import Image
import os

# Define transformations for the images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Load the WBC dataset
#data_dir = "./data/WBC_1/train/data"
data_dir = "./drive/MyDrive/CS5242_project/personal_testing/data/WBC_1/train/data"
wbc_dataset = datasets.ImageFolder(root=data_dir, transform=transform)
wbc_loader = DataLoader(wbc_dataset, batch_size=32, shuffle=True)

# Set device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the ResNet-18 model without pretraining
model = models.resnet18(pretrained=False).to(device)
num_classes = len(wbc_dataset.classes)
model.fc = nn.Linear(model.fc.in_features, num_classes)

# Define a custom dataset class for unlabeled data
class UnlabeledDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.image_files = [os.path.join(data_dir, file) for file in os.listdir(data_dir) if file.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = self.image_files[idx]
        image = Image.open(img_path).convert("RGB")

        # Ensure consistent resizing for both noisy inputs and encoded outputs
        if self.transform:
            image = self.transform(image)

        return image

# Load the unlabeled dataset (pRCC_nolabel)
prcc_data_dir = "./drive/MyDrive/CS5242_project/personal_testing/data/pRCC_nolabel"
prcc_dataset = UnlabeledDataset(prcc_data_dir, transform=transform)
prcc_loader = DataLoader(prcc_dataset, batch_size=32, shuffle=True, pin_memory=True)

# Define the denoising autoencoder model
class DenoisingAutoencoder(nn.Module):
    def __init__(self):
        super(DenoisingAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        return encoded

# Initialize the denoising autoencoder model
autoencoder = DenoisingAutoencoder().to(device)

# Set up criterion (loss function) and optimizer for pretraining
criterion_autoencoder = nn.MSELoss()
optimizer_autoencoder = optim.Adam(autoencoder.parameters(), lr=0.001)

# Pretrain the denoising autoencoder on the unlabeled dataset
num_epochs_autoencoder = 1  # You can adjust this
for epoch in range(num_epochs_autoencoder):
    autoencoder.train()
    total_loss = 0.0
    for inputs in tqdm(prcc_loader, desc=f'Pretraining Epoch {epoch + 1}/{num_epochs_autoencoder}'):
        inputs = inputs.to(device)
        noisy_inputs = inputs + 0.1 * torch.randn_like(inputs)  # Add random noise to the inputs
        encoded_outputs = autoencoder(noisy_inputs)
        print(encoded_outputs)
        print(noisy_inputs)
        loss = criterion_autoencoder(encoded_outputs, noisy_inputs)  # MSE loss between noisy and clean images
        optimizer_autoencoder.zero_grad()
        loss.backward()
        optimizer_autoencoder.step()
        total_loss += loss.item()
    average_loss = total_loss / len(prcc_loader)
    print(f'Pretraining Epoch [{epoch + 1}/{num_epochs_autoencoder}], Loss: {average_loss:.4f}')

# Use the encoder part of the denoising autoencoder to initialize ResNet-18 for further training
resnet_encoder = autoencoder.encoder

# Fetching Cam16 Dataset
cam16_data_dir = "./drive/MyDrive/CS5242_project/personal_testing/data/CAM16_100cls_10mask/train/data"
cam16_dataset = datasets.ImageFolder(root=cam16_data_dir, transform=transform)
cam16_loader = DataLoader(cam16_dataset, batch_size=32, shuffle=True)
model.encoder = resnet_encoder
model.fc = nn.Linear(model.fc.in_features, 2).to(device)

criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# """
# Training loop for CAM16 dataset
num_epochs_cam16 = 10  # You can adjust the number of epochs as needed
for epoch in range(num_epochs_cam16):
    model.train()  # Set the model to training mode
    total_loss = 0.0

    for inputs, labels in tqdm(cam16_loader, desc=f'Epoch {epoch + 1}/{num_epochs_cam16}'):
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Calculate loss
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    average_loss = total_loss / len(cam16_loader)
    print(f'Epoch [{epoch + 1}/{num_epochs_cam16}], Loss: {average_loss:.4f}')

#torch.save(model.state_dict(), "./drive/MyDrive/CS5242_project/personal_testing/pretrained_cam16_model.pth")

model.fc = nn.Linear(model.fc.in_features, num_classes).to(device)
# """
# Set up criterion (loss function) and optimizer
#criterion = nn.CrossEntropyLoss().to(device)
#optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

# Training loop
num_epochs = 10  # As needed

# Single-cycle learning rate strategy
total_iterations = len(wbc_loader) * num_epochs
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.01, total_steps=total_iterations)

for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    for inputs, labels in tqdm(wbc_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Calculate loss
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_samples += labels.size(0)

    average_loss = total_loss / len(wbc_loader)
    accuracy = (correct_predictions / total_samples) * 100
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {average_loss:.4f}, Accuracy: {accuracy:.2f}%')

print("Training finished!")

# Define transformations for the evaluation dataset (without shuffling)
eval_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Load the WBC evaluation dataset
eval_data_dir = "./drive/MyDrive/CS5242_project/personal_testing/data/WBC_100/val/data"
#eval_data_dir = "./data/WBC_100/val/data"
eval_dataset = datasets.ImageFolder(root=eval_data_dir, transform=eval_transform)
eval_loader = DataLoader(eval_dataset, batch_size=32, shuffle=False, pin_memory=True)

# Save only the model weights
#torch.save(model.state_dict(), './drive/MyDrive/CS5242_project/personal_testing/resnet18_WBC100.pth')

# Calculate final accuracy on the evaluation dataset
model.eval()  # Set the model to evaluation mode
total_correct = 0
total_samples = 0

with torch.no_grad():
    for inputs, labels in tqdm(eval_loader, desc="Evaluating"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0)

final_accuracy = (total_correct / total_samples) * 100
print(f'Final Accuracy on the evaluation dataset: {final_accuracy:.2f}%')
