In [None]:
!ls /kaggle/input/alzheimers-adni

In [None]:
import os 
import cv2
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import keras 
from keras.callbacks import EarlyStopping,ModelCheckpoint
import tensorflow as tf
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from tqdm import tqdm
from imblearn.over_sampling import SMOTE
from tensorflow.keras import layers, models

**preparing data**

In [None]:
import os
import pandas as pd
from tqdm import tqdm

images = []
labels = []

train_path = "/kaggle/input/alzheimers-adni/Alzheimers-ADNI/train"

for subfolder in tqdm(os.listdir(train_path)):
    subfolder_path = os.path.join(train_path, subfolder)
    
    
    if os.path.isfile(subfolder_path):
        images.append(subfolder_path)
        labels.append(subfolder)  
        continue
    
    
    for folder in os.listdir(subfolder_path):
        subfolder_path2 = os.path.join(subfolder_path, folder)

        if os.path.isfile(subfolder_path2):  
           
            images.append(subfolder_path2)
            labels.append(subfolder)  
            continue

        
        if os.path.isdir(subfolder_path2):
            for image_filename in os.listdir(subfolder_path2):
                image_path = os.path.join(subfolder_path2, image_filename)
                if os.path.isfile(image_path):  
                    images.append(image_path)
                    labels.append(folder)  

train_df = pd.DataFrame({'image': images, 'label': labels})
train_df


In [None]:
import os
import pandas as pd
import random
from tqdm import tqdm

images = []
labels = []

train_path = "/kaggle/input/alzheimers-adni/Alzheimers-ADNI/test"

for subfolder in tqdm(os.listdir(train_path)):
    subfolder_path = os.path.join(train_path, subfolder)
    
    
    if os.path.isfile(subfolder_path):
        images.append(subfolder_path)
        labels.append(subfolder)  
        continue
    
    
    for folder in os.listdir(subfolder_path):
        subfolder_path2 = os.path.join(subfolder_path, folder)

        if os.path.isfile(subfolder_path2):  
            
            images.append(subfolder_path2)
            labels.append(subfolder)  
            continue

        
        if os.path.isdir(subfolder_path2):
            for image_filename in os.listdir(subfolder_path2):
                image_path = os.path.join(subfolder_path2, image_filename)
                if os.path.isfile(image_path):  
                    images.append(image_path)
                    labels.append(folder)  


test_df = pd.DataFrame({'image': images, 'label': labels})


test_df = test_df.sample(frac=1, random_state=42).reset_index(drop=True)

# split into 50% test and 50% validation
split_index = len(test_df) // 2
validation_df = test_df.iloc[:split_index].reset_index(drop=True)
test_df = test_df.iloc[split_index:].reset_index(drop=True)


print(f"Test Set: {len(test_df)} samples")
print(f"Validation Set: {len(validation_df)} samples")

print(test_df)
print(validation_df)

In [None]:
# convert categorical labels to integers
label_map = {'Final AD JPEG': 0, 'Final CN JPEG': 1, 'Final EMCI JPEG': 2, 'Final LMCI JPEG': 3, 'Final MCI JPEG': 4}
train_df['label'] = train_df['label'].map(label_map)
validation_df['label'] = validation_df['label'].map(label_map)
test_df['label'] = test_df['label'].map(label_map)

In [None]:
train_df['label'] = train_df['label'].astype(int)
validation_df['label'] = validation_df['label'].astype(int)
test_df['label'] = test_df['label'].astype(int)

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd
import numpy as np

class BrainDataset(Dataset):
    def __init__(self, dataframe, image_size=(224, 224), transform=None):
        self.dataframe = dataframe
        self.image_size = image_size
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx]['image']
        label = self.dataframe.iloc[idx]['label']

        
        image = Image.open(img_path).convert('RGB')  # Ensure RGB format
        image = image.resize(self.image_size)

        if self.transform:
            image = self.transform(image)
 

        # Convert label to a tensor
        label = torch.tensor(label, dtype=torch.long)

        return image, label

In [None]:
from torchvision import transforms

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to 224x224
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize using ImageNet stats
])

In [None]:
# Create datasets
train_dataset = BrainDataset(train_df, transform=transform)
val_dataset = BrainDataset(validation_df, transform=transform)
test_dataset = BrainDataset(test_df, transform=transform)

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

**Implementing ResNet-based Model with a Bias-aware Pruning Technique**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

# Define the 3x3 convolution function
def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)

# Define the BasicBlock
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, cfg=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(inplanes, cfg, stride)
        self.bn1 = nn.BatchNorm2d(cfg)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(cfg, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu1(out)
        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = F.relu(out)
        return out

# Define the ResNet model for brain images
class ResNet_Brain(nn.Module):
    def __init__(self, block, layers, cfg=None, num_classes=5):
        super(ResNet_Brain, self).__init__()
        self.inplanes = 64  # Increased initial channels for brain images
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)  
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        # Residual layers
        self.layer1 = self._make_layer(block, 64, layers[0], cfg=cfg[0:layers[0]])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, cfg=cfg[layers[0]:layers[0] + layers[1]])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, cfg=cfg[layers[0] + layers[1]:layers[0] + layers[1] + layers[2]])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, cfg=cfg[layers[0] + layers[1] + layers[2]:layers[0] + layers[1] + layers[2] + layers[3]])
        
        # Global average pooling and fully connected layer
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # Adaptive pooling for variable input sizes
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1, cfg=None):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion)
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, cfg=cfg[0]))
        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes, cfg=cfg[i]))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

# Define a function to create the ResNet model for brain images
def resnet18_brain(cfg=None, num_classes=5):
    return ResNet_Brain(BasicBlock, [2, 2, 2, 2], cfg=cfg, num_classes=num_classes)


cfg = [64] * 8 + [128] * 8 + [256] * 8 + [512] * 8  # Example configuration
model = resnet18_brain(cfg=cfg, num_classes=5)
print(model)


#input_tensor = torch.randn(1, 3, 224, 224)  
#output = model(input_tensor)
#print(output.shape)  

In [None]:
criterion = nn.CrossEntropyLoss()  # Loss function for classification
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # Optimizer


# Define the device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
#spliting bias-aligning and bias-conflicting samples
def get_S_bc_S_ba2(train_loader, model, device):
    S_bc = set()
    S_ba = set()
    unknown = set()

    model.eval()
    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(train_loader):  # Unpack only images and labels
            images, labels = images.to(device), labels.to(device)
            logits = model(images)
            outputs = torch.sigmoid(logits)
            predicted_labels = outputs.argmax(dim=1)

            for idx, label in enumerate(labels):
                sample_index = batch_idx * train_loader.batch_size + idx  # Calculate sample index
                if predicted_labels[idx] == label.item():
                    S_ba.add(sample_index)
                elif predicted_labels[idx] != label.item():
                    S_bc.add(sample_index)
                else:
                    unknown.add(sample_index)

    return S_bc, S_ba

In [None]:
#bias calculation
def get_conv2d_output(model, S, train_loader, device):
    # Initialize a dictionary to store the sum of filter activations for each layer
    filter_activations_sum = {}

    # Define the hook function to collect filter activations for each layer
    def getActivation(layer_name):
        def hook(self, input, output):
            nonlocal filter_activations_sum
            if layer_name in filter_activations_sum:
                filter_activations_sum[layer_name] += output.mean(dim=(2, 3)).detach().squeeze()
            else:
                filter_activations_sum[layer_name] = output.mean(dim=(2, 3)).detach().squeeze()
        return hook

    hooks = []  # List to store hooks for later removal

    # Attach hooks to the convolutional layers in each BasicBlock
    for i in range(1, 5):  # Loop through layer1, layer2, layer3, layer4
        layer = getattr(model, f'layer{i}')
        for j, block in enumerate(layer):  # Loop through each BasicBlock in the layer
            # Attach hook to conv1 in the BasicBlock
            layer_name = f"layer{i}_block{j}_conv1"
            handler = block.conv1.register_forward_hook(getActivation(layer_name))
            hooks.append(handler)

            # Attach hook to conv2 in the BasicBlock
            layer_name = f"layer{i}_block{j}_conv2"
            handler = block.conv2.register_forward_hook(getActivation(layer_name))
            hooks.append(handler)

    # Process samples in S
    for sample_idx in S:
        # Load an image and label from the dataset
        image, label = train_loader.dataset[sample_idx]  # Assuming dataset returns (image, label)
        image = image.to(device)
        image = image.unsqueeze(0)  # Add batch dimension
        # Forward pass through the model
        with torch.no_grad():
            output = model(image)

    # Calculate the average filter activations for each layer
    average_filter_activations = {layer: filter_activations_sum[layer] / len(S) for layer in filter_activations_sum}

    # Remove all the hooks after the forward pass is completed
    for hook in hooks:
        hook.remove()

    return average_filter_activations

In [None]:
import torch
import numpy as np
from torch.utils.data import DataLoader

# Define the device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Hyperparameters
num_epochs = 40
patience = 5  # Number of epochs to wait for improvement before stopping
best_val_loss = np.inf  # Initialize best validation loss to infinity
epochs_without_improvement = 0  # Counter for epochs without improvement
pruning_ratio = 0.1  # Prune 10% of the filters
finetune_epochs = 10  # Number of epochs for fine-tuning

# Function to evaluate the model on the test set
def evaluate_test_accuracy(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    test_loss = 0.0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    test_accuracy = 100 * correct / total
    test_loss = test_loss / len(test_loader)
    return test_loss, test_accuracy

# Early stopping function
def early_stopping(val_loss, patience, epochs_without_improvement, best_val_loss):
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_without_improvement = 0
        # Save the best model
        torch.save(model.state_dict(), 'best_model.pth')
    else:
        epochs_without_improvement += 1
        if epochs_without_improvement >= patience:
            print(f"Early stopping triggered after {epochs_without_improvement} epochs without improvement.")
            return True, best_val_loss, epochs_without_improvement
    return False, best_val_loss, epochs_without_improvement

model = model.to(device)
# Training loop with early stopping
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    # Training phase
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)  # Move data to the device
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Compute accuracy
        _, predicted = torch.max(outputs, 1)  # Get the class with the highest score
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    train_accuracy = 100 * correct / total
    train_loss = running_loss / len(train_loader)

    # Validation phase
    model.eval()
    val_running_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)  # Move data to the device
            outputs = model(images)
            val_loss = criterion(outputs, labels)
            val_running_loss += val_loss.item()

            # Compute accuracy
            _, predicted = torch.max(outputs, 1)
            val_correct += (predicted == labels).sum().item()
            val_total += labels.size(0)

    val_accuracy = 100 * val_correct / val_total
    val_loss = val_running_loss / len(val_loader)

    # Test evaluation phase
    test_loss, test_accuracy = evaluate_test_accuracy(model, test_loader, device)

    # Print metrics
    print(f"Epoch [{epoch + 1}/{num_epochs}], "
          f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, "
          f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%, "
          f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")

    # Early stopping check
    stop_training, best_val_loss, epochs_without_improvement = early_stopping(
        val_loss, patience, epochs_without_improvement, best_val_loss
    )
    if stop_training:
        break

print("Training complete. Best model saved to 'best_model.pth'.")

# Load the best model
model.load_state_dict(torch.load('best_model.pth'))
model.to(device)  # Move the model to the device

In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader

# Define the device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Function to evaluate the model on the test set
def evaluate_test_accuracy(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    test_loss = 0.0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    test_accuracy = 100 * correct / total
    test_loss = test_loss / len(test_loader)
    return test_loss, test_accuracy

# Define the function to copy matching weights
def copy_matching_weights(pruned_model, original_model):
    pruned_state_dict = pruned_model.state_dict()
    original_state_dict = original_model.state_dict()

    for name, param in original_state_dict.items():
        if name in pruned_state_dict:
            if pruned_state_dict[name].shape == param.shape:
                pruned_state_dict[name].copy_(param)
            else:
                print(f"Skipping {name} due to shape mismatch: {param.shape} vs {pruned_state_dict[name].shape}")
        else:
            print(f"Skipping {name} as it is not in the pruned model")

    pruned_model.load_state_dict(pruned_state_dict, strict=False)

# Pruning Phase
print("Starting Pruning...")
model = model.to(device)
pruning_ratio = 0.5  # Prune 50% of the filters

# Get bias-aligned and bias-conflicting samples
S_bc, S_ba = get_S_bc_S_ba2(train_loader, model, device)

# Calculate bias scores
bias_alligned = get_conv2d_output(model, S_ba, train_loader, device)
bias_conflicting = get_conv2d_output(model, S_bc, train_loader, device)
bias_score = {key: bias_alligned[key] - bias_conflicting[key] for key in bias_alligned}

# Calculate threshold for pruning
total_channel = sum([n.shape[0] for n in bias_score.values()])
feature_s = torch.cat([n.flatten() for n in bias_score.values()])
y, i = torch.sort(feature_s, descending=True)
thre_index = int(total_channel * pruning_ratio)
thre = y[thre_index]

# Prune the model
cfg1 = []
cfg_mask = []
pruned = 0
for i, feature_copy in enumerate(bias_score.values()):
    mask = feature_copy.gt(thre).float()  # Create a mask for pruning
    if torch.sum(mask) == 0:
        cfg1.append(len(feature_copy))
        cfg_mask.append(torch.ones(len(feature_copy)).float())
    else:
        pruned += mask.shape[0] - torch.sum(mask)
        cfg1.append(int(torch.sum(mask)))
        cfg_mask.append(mask.clone())

# Save the pruned model
pruned_model = resnet18_brain(cfg=cfg1)  # Replace with your model class
copy_matching_weights(pruned_model, model)  # Copy matching weights
pruned_model.to(device)  # Move the pruned model to the device

# Evaluate test accuracy after pruning
test_loss, test_accuracy = evaluate_test_accuracy(pruned_model, test_loader, device)
print(f"Test Accuracy after Pruning: {test_accuracy:.2f}%")

torch.save({'cfg': cfg1, 'state_dict': pruned_model.state_dict()}, 'pruned_model.pth.tar')
print("Pruning complete. Pruned model saved to 'pruned_model.pth.tar'.")

# Fine-Tuning Phase
print("Starting Fine-Tuning...")
pruned_model.load_state_dict(torch.load('pruned_model.pth.tar')['state_dict'])
pruned_model.to(device)

# Define loss and optimizer for fine-tuning
tune_criterion = nn.CrossEntropyLoss()  # Use standard cross-entropy loss for fine-tuning
finetune_optimizer = torch.optim.Adam(pruned_model.parameters(), lr=1e-4, weight_decay=5e-4)

# Fine-tuning loop
finetune_epochs = 10  # Number of epochs for fine-tuning
for epoch in range(finetune_epochs):
    pruned_model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = pruned_model(images)
        loss = tune_criterion(outputs, labels)

        finetune_optimizer.zero_grad()
        loss.backward()
        finetune_optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    train_accuracy = 100 * correct / total
    train_loss = running_loss / len(train_loader)

    # Evaluate test accuracy after each fine-tuning epoch
    test_loss, test_accuracy = evaluate_test_accuracy(pruned_model, test_loader, device)
    print(f"Fine-Tuning Epoch [{epoch + 1}/{finetune_epochs}], "
          f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, "
          f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")

# Save the fine-tuned model
torch.save({
    'cfg': cfg1,  # Save the configuration
    'state_dict': pruned_model.state_dict(),  # Save the model's state_dict
}, 'fine_tuned_model.pth')
print("Fine-Tuning complete. Fine-tuned model saved to 'fine_tuned_model.pth'.")

**Preparing data for other models**

In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np

image_size = (224, 224)
batch_size = 32

# function to add Gaussian noise
def add_gaussian_noise(image):
    noise = tf.random.normal(shape=tf.shape(image), mean=0.0, stddev=0.05, dtype=tf.float32)
    return tf.clip_by_value(image + noise, 0.0, 1.0)  # Ensures values remain between [0,1]

# data augmentation for training set
train_datagen = ImageDataGenerator(
    preprocessing_function=lambda x: add_gaussian_noise(tf.keras.applications.vgg16.preprocess_input(x)),
    rescale=1./255,
    rotation_range=30,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    brightness_range=[0.8, 1.2]
)


test_val_datagen = ImageDataGenerator(
    preprocessing_function=tf.keras.applications.vgg16.preprocess_input,
    rescale=1./255
)

# Train dataset with augmentation
train_generator = train_datagen.flow_from_dataframe(
    train_df,
    x_col='image',
    y_col='label',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',
    color_mode='rgb',
    shuffle=True
)

# Validation dataset (no augmentation)
val_generator = test_val_datagen.flow_from_dataframe(
    validation_df,
    x_col='image',
    y_col='label',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',
    color_mode='rgb',
    shuffle=True
)

# Test dataset (no augmentation)
test_generator = test_val_datagen.flow_from_dataframe(
    test_df,
    x_col='image',
    y_col='label',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',
    color_mode='rgb',
    shuffle=False
)

train_generator

In [None]:
import numpy as np
from collections import Counter


class_indices = train_generator.classes  
class_labels = list(train_generator.class_indices.keys())  

class_counts = Counter(class_indices)

for class_index, count in class_counts.items():
    print(f"Class '{class_labels[class_index]}' has {count} samples")

image_size = (224,224)
batch_size = 32
datagen = ImageDataGenerator(
    preprocessing_function= tf.keras.applications.vgg16.preprocess_input,
    rescale=1./255,
    horizontal_flip=True
)
train_generator = datagen.flow_from_dataframe(
    train_df,
    x_col='image',
    y_col='label',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',
    color_mode='rgb',
    shuffle=True
)
test_generator = datagen.flow_from_dataframe(
    test_df,
    x_col='image',
    y_col='label',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',
    color_mode='rgb',
    shuffle=False
)
val_generator = datagen.flow_from_dataframe(
    validation_df,
    x_col='image',
    y_col='label',
    color_mode='rgb',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True
)

train_generator

class_num=list(train_generator.class_indices.keys())
class_num

**baseline CNN Model**

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)),
    layers.BatchNormalization(),
    layers.Conv2D(32, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),

    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),

    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),

    layers.Conv2D(256, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.Conv2D(256, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.5),

    layers.Flatten(),
    layers.Dense(256, activation='relu'),
    layers.BatchNormalization(),
    layers.Dropout(0.5),
    layers.Dense(128, activation='relu'),
    layers.BatchNormalization(),
    layers.Dropout(0.5),
    
    layers.Dense(5, activation='softmax')
])

model.summary()

In [None]:
tf.keras.utils.plot_model(model, to_file='model.png', show_shapes=True, show_layer_names=True,show_dtype=True,dpi=120)

early_stopping_cb =EarlyStopping(patience=10, restore_best_weights=True)
model.compile(optimizer ='adam', loss='categorical_crossentropy', metrics=['accuracy'])
history = model.fit(train_generator, epochs=50, validation_data=val_generator, callbacks=[early_stopping_cb])

loss,accuracy=model.evaluate(test_generator)
print("loss : ",loss)
print("accuracy : ",accuracy)

ef=pd.DataFrame(history.history)
ef[['loss','val_loss']].plot()
ef[['accuracy','val_accuracy']].plot()

ef=pd.DataFrame(history.history)
ef[['loss','val_loss']].plot()
ef[['accuracy','val_accuracy']].plot()

**2D Convolutional Neural Network with Dual Attention Module**

In [None]:
class ChannelAttentionModule(layers.Layer):
    def __init__(self, filters, reduction_ratio=8):
        super(ChannelAttentionModule, self).__init__()
        self.filters = filters
        self.reduction_ratio = reduction_ratio

    def build(self, input_shape):
        # shared MLP (Multi-Layer Perceptron)
        self.mlp = models.Sequential([
            layers.Dense(self.filters // self.reduction_ratio, activation='relu'),
            layers.Dense(self.filters, activation=None)
        ])

    def call(self, inputs):
        # global Average Pooling
        avg_pool = tf.reduce_mean(inputs, axis=[1, 2], keepdims=True)
        avg_pool = self.mlp(avg_pool)

        # global Max Pooling
        max_pool = tf.reduce_max(inputs, axis=[1, 2], keepdims=True)
        max_pool = self.mlp(max_pool)

        # combine Avg and Max Pooling
        channel_attention = tf.sigmoid(avg_pool + max_pool)

        # apply Channel Attention
        output = inputs * channel_attention
        return output

In [None]:
class SpatialAttentionModule(layers.Layer):
    def __init__(self):
        super(SpatialAttentionModule, self).__init__()

    def build(self, input_shape):
        # convolutional layer to generate spatial attention map
        self.conv = layers.Conv2D(1, kernel_size=7, padding='same', activation='sigmoid')

    def call(self, inputs):
        # average Pooling along the channel axis
        avg_pool = tf.reduce_mean(inputs, axis=-1, keepdims=True)

        # max Pooling along the channel axis
        max_pool = tf.reduce_max(inputs, axis=-1, keepdims=True)

        # concatenate Avg and Max Pooling
        concat = tf.concat([avg_pool, max_pool], axis=-1)

        # generate Spatial Attention Map
        spatial_attention = self.conv(concat)

        # apply Spatial Attention
        output = inputs * spatial_attention
        return output

In [None]:
class DualAttentionModule2D(layers.Layer):
    def __init__(self, filters, reduction_ratio=8):
        super(DualAttentionModule2D, self).__init__()
        self.channel_attention = ChannelAttentionModule(filters, reduction_ratio)
        self.spatial_attention = SpatialAttentionModule()

    def call(self, inputs):
        # apply Channel Attention
        x = self.channel_attention(inputs)

        # apply Spatial Attention
        x = self.spatial_attention(x)

        return x

In [None]:
class DualAttentionModule2D(layers.Layer):
    def __init__(self, filters):
        super(DualAttentionModule2D, self).__init__()
        self.filters = filters

    def build(self, input_shape):
        # spatial Attention
        self.spatial_attention = layers.Conv2D(1, kernel_size=1, activation='sigmoid')

        # channel Attention
        self.channel_attention = layers.GlobalAveragePooling2D()
        self.channel_fc1 = layers.Dense(self.filters // 8, activation='relu')
        self.channel_fc2 = layers.Dense(self.filters, activation='sigmoid')

    def call(self, inputs):
        # spatial Attention
        spatial_attention_map = self.spatial_attention(inputs)
        spatial_output = inputs * spatial_attention_map

        # channel Attention
        channel_attention_map = self.channel_attention(inputs)
        channel_attention_map = self.channel_fc1(channel_attention_map)
        channel_attention_map = self.channel_fc2(channel_attention_map)
        channel_attention_map = tf.reshape(channel_attention_map, [-1, 1, 1, self.filters])
        channel_output = inputs * channel_attention_map

        # combine Spatial and Channel Attention
        output = spatial_output + channel_output
        return output

In [None]:
def residual_block_2d(x, filters, kernel_size=3, stride=1):
    # Shortcut connection
    shortcut = x

    # First 2D Convolution
    x = layers.Conv2D(filters, kernel_size, strides=stride, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    # Second 2D Convolution
    x = layers.Conv2D(filters, kernel_size, padding='same')(x)
    x = layers.BatchNormalization()(x)

    # Add shortcut connection
    if shortcut.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, kernel_size=1, strides=stride, padding='same')(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Add()([x, shortcut])
    x = layers.ReLU()(x)
    return x

In [None]:
def build_2d_dam_model(input_shape=(224, 224, 3), num_classes=5):
    inputs = layers.Input(shape=input_shape)

    # Initial 2D Convolution
    x = layers.Conv2D(8, kernel_size=3, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    # Residual Blocks with Dual Attention Modules
    filters_list = [16, 32, 64]
    for filters in filters_list:
        x = residual_block_2d(x, filters)
        x = DualAttentionModule2D(filters)(x)

    # Global Average Pooling
    x = layers.GlobalAveragePooling2D()(x)

    # Fully Connected Layers
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)

    # Output Layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    # Create the model
    model = models.Model(inputs, outputs)
    return model

In [None]:
model = build_2d_dam_model(input_shape=(224, 224, 3), num_classes=5)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

tf.keras.utils.plot_model(model, to_file='model.png', show_shapes=True, show_layer_names=True,show_dtype=True,dpi=120)

early_stopping_cb =EarlyStopping(patience=10, restore_best_weights=True)
model.compile(optimizer ='adam', loss='categorical_crossentropy', metrics=['accuracy'])
history = model.fit(train_generator, epochs=50, validation_data=val_generator, callbacks=[early_stopping_cb])