In [None]:
import os
import numpy as np
import pandas as pd
from PIL import Image
import cv2
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import models, transforms



In [None]:
# Custom Dataset for loading images and extracting patches based on nuclear density
class HistologyDataset(Dataset):
    def __init__(self, image_dir, csv_path, transform=None, patch_size=299, overlap=0.5, threshold=1.587, min_blue_density=0.02):
        self.image_dir = image_dir
        self.transform = transform
        self.patch_size = patch_size
        self.overlap = overlap
        self.threshold = threshold
        self.min_blue_density = min_blue_density

        # Load CSV containing file names and labels
        self.data = pd.read_csv(csv_path)
        self.image_paths = [os.path.join(image_dir, fname) for fname in self.data.iloc[:, 0]]
        self.labels = self.data.iloc[:, 1].values

        # Create a label-to-index mapping if your labels are strings
        self.label_to_idx = {label: idx for idx, label in enumerate(np.unique(self.labels))}
        self.labels = np.array([self.label_to_idx[label] for label in self.labels], dtype=np.int64)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]

        # Load the image using PIL and convert to RGB
        image = Image.open(image_path).convert('RGB')
        image = np.array(image)

        # Extract patches from the image
        patches, patch_labels = self.extract_patches(image, label)

        # Apply transformations if provided
        if self.transform:
            patches = [self.transform(patch) for patch in patches]

        return patches, patch_labels

    def extract_patches(self, image, label):
        patches = []
        patch_labels = []
        height, width, _ = image.shape
        stride = int(self.patch_size * (1 - self.overlap))

        # Slide window to extract patches
        for y in range(0, height - self.patch_size + 1, stride):
            for x in range(0, width - self.patch_size + 1, stride):
                patch = image[y:y+self.patch_size, x:x+self.patch_size]
                if self.is_nucleus_dense(patch):
                    patches.append(patch)
                    patch_labels.append(label)

        return patches, patch_labels

    def is_nucleus_dense(self, patch):
        # Convert patch to HSV and extract the blue channel
        hsv_patch = cv2.cvtColor(patch, cv2.COLOR_RGB2HSV)
        blue_mask = (hsv_patch[:, :, 0] > self.threshold).astype(np.uint8)
        blue_density = np.sum(blue_mask) / (patch.shape[0] * patch.shape[1])
        return blue_density > self.min_blue_density

# transformations
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(180),
    transforms.ToTensor()
])


In [None]:

class ModifiedInceptionV3(nn.Module):
    def __init__(self, num_classes=4):
        super(ModifiedInceptionV3, self).__init__()
        self.inception = models.inception_v3(pretrained=True)
        self.inception.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.inception.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self.inception.fc.in_features, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, num_classes)
        )

    def forward(self, x):
        # The inception_v3 model returns two outputs (logits and auxiliary logits)
        # only retain primary logits
        outputs = self.inception(x)
        if isinstance(outputs, tuple):
            # Extract the primary output from the tuple (discard the auxiliary output)
            outputs = outputs[0]
        return outputs



In [None]:
# Training loop
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=50):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        # Training phase
        for patches, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
            patches = torch.cat([patch.to(device) for patch in patches], dim=0)  # Combine patches into a single batch
            
            # Flatten the nested list/tensor structure of labels
            if isinstance(labels, list) or isinstance(labels, tuple):
                labels = torch.cat(labels, dim=0).to(device)  # Flatten into a single tensor

            #print(f"Flattened Labels: {labels}")  #  Print flattened labels

            # Convert labels to the appropriate tensor type
            labels = labels.long()  # Ensure it's long type for classification
            
            optimizer.zero_grad()
            outputs = model(patches)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_acc = 100 * correct / total
        print(f"Train Loss: {running_loss / len(train_loader):.4f}, Train Accuracy: {train_acc:.2f}%")

        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for patches, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
                patches = torch.cat([patch.to(device) for patch in patches], dim=0)

                # Flatten the nested list/tensor structure of labels
                if isinstance(labels, list) or isinstance(labels, tuple):
                    labels = torch.cat(labels, dim=0).to(device)  # Flatten into a single tensor

                # Ensure labels are in the correct format
                labels = labels.long()

                outputs = model(patches)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_acc = 100 * correct / total
        print(f"Validation Loss: {val_loss / len(val_loader):.4f}, Validation Accuracy: {val_acc:.2f}%")


# Majority Voting for Final Image Classification
def majority_voting(patch_predictions):
    # Perform majority voting
    values, counts = np.unique(patch_predictions, return_counts=True)
    majority_class = values[np.argmax(counts)]
    return majority_class


In [None]:
# Directory paths
image_dir = r"C:\Users\vamsv\Downloads\ICIAR2018_BACH_Challenge\ICIAR2018_BACH_Challenge\Photos\images"  
csv_path = r"C:\Users\vamsv\Downloads\ICIAR2018_BACH_Challenge\ICIAR2018_BACH_Challenge\Photos\microscopy_ground_truth.csv"  

dataset = HistologyDataset(image_dir=image_dir, csv_path=csv_path, transform=transform)
train_size = int(0.75 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ModifiedInceptionV3(num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)

train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=50)