In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
import pandas as pd

In [None]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

# Your PyTorch code here


In [None]:
# Define class names
class_names = ['glioma_tumor','meningioma_tumor','no_tumor','pituitary_tumor']
class_names_label = {class_name: i for i, class_name in enumerate(class_names)}
nb_classes = len(class_names)

IMAGE_SIZE = (32,32)
BATCH_SIZE = 32

In [None]:
# Define transformations (data preprocessing)
transform = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Load datasets
train_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/Colab Notebooks/ccatract detection using VAEs/train', transform=transform)
test_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/Colab Notebooks/ccatract detection using VAEs/test', transform=transform)


In [None]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# Check dataset size
n_train = len(train_dataset)
n_test = len(test_dataset)

print(f"Number of training examples: {n_train}")
print(f"Number of testing examples: {n_test}")
print(f"Each image is of size: {IMAGE_SIZE}")

In [None]:
# Visualizing the dataset
def display_random_image(loader, class_names):
    """
    Display a random image from the dataset and its corresponding label.
    """
    # Get a batch of data
    dataiter = iter(loader)
    images, labels = next(dataiter)  # Use Python's built-in next() function

    index = np.random.randint(len(labels))  # Get a random index
    image = images[index].numpy().transpose(1, 2, 0)  # Transpose for correct display

    # Unnormalize the image for correct visualization (reverse of normalization)
    image = image * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
    image = np.clip(image, 0, 1)  # Clip to ensure valid pixel range

    plt.figure()
    plt.imshow(image)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.title(f'Image #{index} : {class_names[labels[index]]}')
    plt.show()

# Call the function
display_random_image(train_loader, class_names)


In [None]:
# Plotting dataset distribution
train_labels = [label for _, label in train_dataset]
test_labels = [label for _, label in test_dataset]
_, train_counts = np.unique(train_labels, return_counts=True)
_, test_counts = np.unique(test_labels, return_counts=True)
pd.DataFrame({'train': train_counts,
              'test': test_counts}, index=class_names).plot.bar()
plt.show()

In [None]:
def display_examples(class_names, loader):
    num_examples = 30
    dataiter = iter(loader)
    images, labels = next(dataiter)

    plt.figure(figsize=(10, 10))
    for i in range(min(num_examples, len(images))):
        plt.subplot(6, 5, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)

        # Convert tensor to numpy and clip values to [0, 1] range for imshow
        image = images[i].numpy().transpose(1, 2, 0)
        image = np.clip(image, 0, 1)  # Clipping values to the valid range

        plt.imshow(image)
        plt.xlabel(class_names[labels[i].item()])
    plt.show()


In [None]:
display_examples(class_names, train_loader)

In [None]:
import os
os.environ['TORCH_USE_CUDA_DSA'] = "1"

# Your PyTorch code here


In [None]:
!pip install torch==2.0.0+cu118 torchvision==0.15.0+cu118 torchaudio==2.0.0+cu118 -f https://download.pytorch.org/whl/cu118/torch_stable.html


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os
import numpy as np

# Define Custom Dataset
class ImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Collect all image paths and labels
        for label in os.listdir(root_dir):
            label_dir = os.path.join(root_dir, label)
            if os.path.isdir(label_dir):
                for img_name in os.listdir(label_dir):
                    self.image_paths.append(os.path.join(label_dir, img_name))
                    self.labels.append(label)

        self.labels = np.array(self.labels)
        self.unique_labels = list(set(self.labels))
        self.label_to_index = {label: idx for idx, label in enumerate(self.unique_labels)}

        # Convert labels to indices
        self.labels = np.array([self.label_to_index[label] for label in self.labels])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

# Define Transforms
transform = transforms.Compose([
    transforms.Resize((32, 32)),  # Resize images to 32x32 (adjust if needed)
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Define VAE Model
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=4, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1)

        # Calculate output dimension after convolutions
        self.fc1_input_dim = self._conv2d_output_dim(32, 4, 2, 1, 32)  # Assuming input images are 32x32
        self.fc1_input_dim = self._conv2d_output_dim(64, 4, 2, 1, self.fc1_input_dim)
        self.fc1_input_dim = self._conv2d_output_dim(128, 4, 2, 1, self.fc1_input_dim)

        print(f'VAE fc1_input_dim: {self.fc1_input_dim}')  # Debug statement

        # Adjust the input dimension for fc1
        self.fc1 = nn.Linear(128 * self.fc1_input_dim * self.fc1_input_dim, 256)
        self.fc2 = nn.Linear(256, 20)
        self.fc3 = nn.Linear(256, 20)
        self.fc4 = nn.Linear(20, 256)
        self.fc5 = nn.Linear(256, 128 * self.fc1_input_dim * self.fc1_input_dim)

        self.deconv1 = nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1)
        self.deconv2 = nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1)
        self.deconv3 = nn.ConvTranspose2d(32, 3, kernel_size=4, stride=2, padding=1)

    def _conv2d_output_dim(self, in_channels, kernel_size, stride, padding, input_dim):
        return (input_dim - kernel_size + 2 * padding) // stride + 1

    def encode(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        print(f'Encoded x shape: {x.shape}')  # Debug statement
        x = x.view(-1, 128 * self.fc1_input_dim * self.fc1_input_dim)
        h = torch.relu(self.fc1(x))
        return self.fc2(h), self.fc3(h)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h = torch.relu(self.fc4(z))
        h = torch.relu(self.fc5(h))
        h = h.view(-1, 128, self.fc1_input_dim, self.fc1_input_dim)
        print(f'Decoded h shape: {h.shape}')  # Debug statement
        h = torch.relu(self.deconv1(h))
        h = torch.relu(self.deconv2(h))
        return torch.sigmoid(self.deconv3(h))

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

# Define Loss Function for VAE
def loss_function(recon_x, x, mu, logvar):
    BCE = nn.functional.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

# Training the VAE
def train_vae(model, train_loader, epochs=20, lr=1e-3):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    model.train()

    for epoch in range(epochs):
        train_loss = 0
        for batch_idx, (data, _) in enumerate(train_loader):
            data = data.cuda()  # Move data to GPU
            optimizer.zero_grad()
            recon_batch, mu, logvar = model(data)
            loss = loss_function(recon_batch, data, mu, logvar)
            loss.backward()
            train_loss += loss.item()
            optimizer.step()

        print(f'Epoch {epoch + 1}/{epochs}, Loss: {train_loss / len(train_loader.dataset)}')

# Define Classifier Model
class Classifier(nn.Module):
    def __init__(self, nb_classes):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(20, 128)
        self.fc2 = nn.Linear(128, nb_classes)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

# Training the Classifier
def train_classifier(classifier, vae, train_loader, epochs=10, lr=1e-3):
    optimizer = optim.Adam(classifier.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    classifier.train()

    for epoch in range(epochs):
        classifier_loss = 0
        for data, target in train_loader:
            data, target = data.cuda(), target.cuda()  # Move data and target to GPU
            if target.min() < 0 or target.max() >= nb_classes:
                print(f'Invalid target labels: {target}')  # Debug statement
            optimizer.zero_grad()
            mu, _ = vae.encode(data)
            output = classifier(mu)
            loss = criterion(output, target)
            loss.backward()
            classifier_loss += loss.item()
            optimizer.step()

        print(f'Epoch {epoch + 1}/{epochs}, Classification Loss: {classifier_loss / len(train_loader.dataset)}')

# Evaluate the Classifier
def evaluate_classifier(classifier, vae, test_loader):
    classifier.eval()
    vae.eval()

    correct = 0
    total = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.cuda(), target.cuda()  # Move data and target to GPU
            mu, _ = vae.encode(data)
            output = classifier(mu)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    print(f'Accuracy: {100 * correct / total:.2f}%')

# Main script
if __name__ == "__main__":
    # Set paths to your dataset directories
    train_dir = 'path/to/your/train_data'  # User should replace this
    test_dir = 'path/to/your/test_data'  # User should replace this

    # Create datasets
    train_dataset = ImageDataset(root_dir=train_dir, transform=transform)
    test_dataset = ImageDataset(root_dir=test_dir, transform=transform)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    # Instantiate and train the VAE
    vae = VAE().cuda()
    train_vae(vae, train_loader)



In [None]:
import torch
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt

# Define your transforms (same as used during training)
transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Define class names (should match the training classes)
class_names = ['glioma_tumor','meningioma_tumor','no_tumor','pituitary_tumor']  # Replace with actual class names

# Define the VAE and Classifier models
vae = VAE().cuda()
classifier = Classifier(nb_classes=len(class_names)).cuda()  # Ensure nb_classes is set correctly

# Load pre-trained model weights if you have them
# vae.load_state_dict(torch.load('path_to_vae_weights.pth'))
# classifier.load_state_dict(torch.load('path_to_classifier_weights.pth'))

# Function to load and preprocess a single image
def load_and_preprocess_image(image_path, transform):
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)  # Add batch dimension
    return image

# Function to predict class label of a single image
def predict_image(image_path, vae, classifier, transform, class_names):
    # Load and preprocess the image
    image = load_and_preprocess_image(image_path, transform)

    # Ensure the models are in evaluation mode
    vae.eval()
    classifier.eval()

    with torch.no_grad():
        # Move image to the same device as the models
        image = image.cuda()

        # Encode the image using VAE
        mu, _ = vae.encode(image)

        # Classify the latent representation
        output = classifier(mu)

        # Get the predicted class
        _, predicted = torch.max(output.data, 1)
        predicted_class = class_names[predicted.item()]

    return predicted_class

# Function to display an image
def display_image(image_path, predicted_class):
    image = Image.open(image_path)
    plt.imshow(image)
    plt.title(f'Predicted Class: {predicted_class}')
    plt.axis('off')  # Hide axis
    plt.show()

# Path to the image you want to classify
image_path = '/content/drive/MyDrive/Colab Notebooks/ccatract detection using VAEs/test/retina_disease/Retina_052.png'

# Perform the prediction
predicted_class = predict_image(image_path, vae, classifier, transform, class_names)

# Display the image along with the prediction
display_image(image_path, predicted_class)
