In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import mobilenet_v2
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
import torch.nn.functional as F

In [None]:
num_classes=3

In [None]:
# Define MobileNet feature extractor with 1x1 conv layer to reduce channels to 768
class MobileNetV2FeatureExtractor(nn.Module):
    def __init__(self):
        super(MobileNetV2FeatureExtractor, self).__init__()
        # Load pretrained MobileNetV2 model
        self.mobilenet = mobilenet_v2(pretrained=True)
        # Extract features up to the last feature layer
        self.features = self.mobilenet.features
        # Add a 1x1 convolutional layer to reduce channels to 768
        self.conv_out = nn.Conv2d(in_channels=1280, out_channels=768, kernel_size=1)

    def forward(self, x):
        # Extract features from MobileNet
        x = self.features(x)  # Output shape will be (batch_size, 1280, 7, 7)
        # Reduce channels to 768
        x = self.conv_out(x)  # Output shape will be (batch_size, 768, 7, 7)
        return x


In [None]:
import torch

def check_gpu_availability():
    # Check if CUDA is available
    cuda_available = torch.cuda.is_available()
    print(f"CUDA Available: {cuda_available}")

    if cuda_available:
        # Get the number of GPUs available
        num_gpus = torch.cuda.device_count()
        print(f"Number of GPUs Available: {num_gpus}")

        for i in range(num_gpus):
            # Get the name of the GPU
            gpu_name = torch.cuda.get_device_name(i)
            # Get the memory allocated to the GPU
            gpu_memory_allocated = torch.cuda.memory_allocated(i) / (1024 ** 3)  # Convert bytes to GB
            # Get the memory cached on the GPU
            gpu_memory_cached = torch.cuda.memory_reserved(i) / (1024 ** 3)  # Convert bytes to GB

            print(f"\nGPU {i}:")
            print(f"  Name: {gpu_name}")
            print(f"  Memory Allocated: {gpu_memory_allocated:.2f} GB")
            print(f"  Memory Cached: {gpu_memory_cached:.2f} GB")
    else:
        print("No GPU devices found.")

if __name__ == "__main__":
    check_gpu_availability()


CUDA Available: True
Number of GPUs Available: 1

GPU 0:
  Name: Tesla T4
  Memory Allocated: 0.00 GB
  Memory Cached: 0.00 GB


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

def window_partition(x, window_size):
    """Partition the input into non-overlapping windows."""
    B, C, H, W = x.shape
    x = x.view(B, C, H // window_size, window_size, W // window_size, window_size)
    windows = x.permute(0, 2, 4, 1, 3, 5).contiguous().view(-1, C, window_size, window_size)
    return windows

def window_reverse(windows, window_size, H, W):
    """Reverse the window partition operation."""
    B = int(windows.shape[0] / (H * W / (window_size ** 2)))
    x = windows.view(B, H // window_size, W // window_size, windows.size(1), window_size, window_size)
    x = x.permute(0, 3, 1, 4, 2, 5).contiguous().view(B, windows.size(1), H, W)
    return x

class WindowAttention(nn.Module):
    def __init__(self, hidden_dim, window_size, heads):
        super(WindowAttention, self).__init__()
        self.hidden_dim = hidden_dim
        self.window_size = window_size
        self.heads = heads

        self.attention = nn.MultiheadAttention(embed_dim=hidden_dim, num_heads=heads)

    def forward(self, x):
        B, C, H, W = x.shape

        # Partition the input into windows
        windows = window_partition(x, self.window_size)  # Shape: (num_windows * B, C, window_size, window_size)

        # Flatten each window to apply attention
        windows = windows.view(windows.size(0), C, -1).permute(2, 0, 1)  # Shape: (window_size^2, num_windows * B, C)

        # Apply attention within each window
        attn_output, _ = self.attention(windows, windows, windows)

        # Reshape back to windows
        attn_output = attn_output.permute(1, 2, 0).view(-1, C, self.window_size, self.window_size)

        # Reverse the window partition to restore the spatial dimensions
        x = window_reverse(attn_output, self.window_size, H, W)

        return x

class SwinTransformerStage4(nn.Module):
    def __init__(self, hidden_dim=768, window_size=7, heads=12, num_classes=num_classes, shift_size=3):
        super(SwinTransformerStage4, self).__init__()
        self.hidden_dim = hidden_dim
        self.window_size = window_size
        self.heads = heads
        self.shift_size = shift_size
        self.dropout_rate = dropout_rate

        # Define the window-based attention block
        self.window_attention = WindowAttention(hidden_dim, window_size, heads)

        # Define the shifted window-based attention block
        self.shifted_window_attention = WindowAttention(hidden_dim, window_size, heads)

        # Feed-forward MLP
        self.mlp = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim * 4),
            nn.GELU(),
            nn.Dropout(p=dropout_rate),  # Add dropout after activation
            nn.Linear(hidden_dim * 4, hidden_dim),
            nn.Dropout(p=dropout_rate)   # Add dropout after the second Linear layer
        )

        # Classification head
        self.norm = nn.LayerNorm(hidden_dim)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        B, C, H, W = x.shape

        # Apply window-based attention
        x = self.window_attention(x)

        # Apply shifted window-based attention
        if self.shift_size > 0:
            # Shift the input
            x_shifted = torch.roll(x, shifts=(-self.shift_size, -self.shift_size), dims=(2, 3))
            x_shifted = self.shifted_window_attention(x_shifted)

            # Reverse the shift
            x = torch.roll(x_shifted, shifts=(self.shift_size, self.shift_size), dims=(2, 3))
        else:
            x = self.window_attention(x)

        # Feed-forward block
        x = x.view(B, C, H * W).permute(2, 0, 1)  # Flatten spatial dimensions
        x = self.mlp(x.permute(1, 0, 2)) + x.permute(1, 0, 2)  # (batch_size, hw, channels)

        # Global average pooling and classification
        x = x.mean(dim=1)
        x = self.norm(x)
        return self.fc(x)


In [None]:
# Combined MobileNet + Swin Transformer Model
class MobileNetSwin(nn.Module):
    def __init__(self, num_classes=num_classes):
        super(MobileNetSwin, self).__init__()
        self.mobilenet = MobileNetV2FeatureExtractor()
        self.swin = SwinTransformerStage4(hidden_dim=768, heads=12, num_classes=num_classes)

    def forward(self, x):
        # Extract features from MobileNet
        x = self.mobilenet(x)
        # Pass features to Swin Transformer for classification
        x = self.swin(x)
        return x

In [None]:
# # Data preparation for CIFAR-10
# transform = transforms.Compose([
#     transforms.Resize((224, 224)),  # Resize to match MobileNet input
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
# ])

# train_dataset = datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)
# test_dataset = datasets.CIFAR10(root='./data', train=False, transform=transform, download=True)

# # Use only 500 images from the dataset
# subset_indices = torch.randperm(len(train_dataset))[:5000]  # Randomly select 500 indices
# subset_indices_test = torch.randperm(len(test_dataset))[:1000]
# train_subset = Subset(train_dataset, subset_indices)       # Create the subset of 500 images
# test_subset = Subset(test_dataset, subset_indices_test)

# train_loader = DataLoader(dataset=train_subset, batch_size=32, shuffle=True)
# test_loader = DataLoader(dataset=test_subset, batch_size=32, shuffle=False)


In [None]:
# pip install tensorflow

In [None]:
import tensorflow

In [None]:
import numpy as np

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Specify the path to your organized dataset
dataset_path = '/content/drive/MyDrive/Augmented Dataset/Augmented_sugarcane_disease_deficiency_dataset' #'C:/Users/gautam/Documents/subhasish/Dataset/3.sugarcane_disease_dataset' #sugarcane_disease_deficiency_dataset'

# Create an ImageDataGenerator
datagen = ImageDataGenerator(dtype='int')

# Use flow_from_directory to load the images and labels
sugarcane_disease_deficiency_dataset = datagen.flow_from_directory(
    dataset_path,
    target_size=(224, 224),
    batch_size=4573,
    class_mode='categorical',
    shuffle=True
)


Found 5716 images belonging to 3 classes.


In [None]:
num_classes = 3
input_shape = (224, 224, 3)

# (x_train, y_train), (x_test, y_test) = a # keras.datasets.cifar10.load_data()
x_train, y_train = next(sugarcane_disease_deficiency_dataset)
x_test, y_test = next(sugarcane_disease_deficiency_dataset)

y_train = [np.argmax(label) if 1 in label else np.nan for label in sugarcane_disease_deficiency_dataset[0][1]]
y_train = np.array([np.array([x]) for x in y_train])

y_test = [np.argmax(label) if 1 in label else np.nan for label in sugarcane_disease_deficiency_dataset[1][1]]
y_test = np.array([np.array([x]) for x in y_test])

# train_loader = DataLoader(dataset=train_subset, batch_size=32, shuffle=True)
# test_loader = DataLoader(dataset=test_subset, batch_size=32, shuffle=False)


print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

# # Example data
# x_train = np.random.rand(1000, 224, 224, 3).astype(np.float32)
# y_train = np.random.randint(0, 10, size=(1000, 1)).astype(np.int64)
# x_test = np.random.rand(250, 224, 224, 3).astype(np.float32)
# y_test = np.random.randint(0, 10, size=(250, 1)).astype(np.int64)

# Convert numpy arrays to PyTorch tensors
x_train_tensor = torch.tensor(x_train, dtype=torch.float32).permute(0, 3, 1, 2)  # Convert to (N, C, H, W)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).squeeze()  # Remove singleton dimension
x_test_tensor = torch.tensor(x_test, dtype=torch.float32).permute(0, 3, 1, 2)  # Convert to (N, C, H, W)
y_test_tensor = torch.tensor(y_test, dtype=torch.long).squeeze()    # Remove singleton dimension

# Define the custom dataset
class CustomDataset(Dataset):
    def __init__(self, x_data, y_data, transform=None):
        self.x_data = x_data
        self.y_data = y_data
        self.transform = transform

    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, idx):
        sample = self.x_data[idx]
        label = self.y_data[idx]
        if self.transform:
            sample = self.transform(sample)
        return sample, label

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Create dataset instances
train_dataset = CustomDataset(x_data=x_train_tensor, y_data=y_train_tensor, transform=transform)
test_dataset = CustomDataset(x_data=x_test_tensor, y_data=y_test_tensor, transform=transform)

# Create DataLoader instances
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)

# Example usage of the DataLoader
for inputs, labels in train_loader:
    print(f"Batch shape: {inputs.shape}")
    print(f"Labels shape: {labels.shape}")
    break  # Just to show one batch


In [None]:
dropout_rate = 0.5

In [None]:
# Initialize model, loss function, and optimizer
model = MobileNetSwin(num_classes=num_classes).cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-6, weight_decay=1e-3)

In [None]:
import torch
import matplotlib.pyplot as plt
import time
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Define the model, criterion, optimizer, and data loaders
# model = ... (Your model definition)
# criterion = ... (Your loss function, e.g., nn.CrossEntropyLoss())
# optimizer = ... (Your optimizer, e.g., torch.optim.SGD or Adam)
# train_loader = ... (DataLoader for training data)
# test_loader = ... (DataLoader for testing/validation data)

num_epochs = 50
train_losses = []  # To store training loss values for plotting
train_accuracies = []  # To store training accuracy per epoch
val_losses = []  # To store validation loss values for plotting
val_accuracies = []  # To store validation accuracy per epoch
epoch_times = []  # To store time taken per epoch

for epoch in range(num_epochs):
    start_time = time.time()  # Start time for the epoch

    # --- Training Phase ---
    model.train()  # Set model to training mode
    running_loss = 0.0
    correct_train = 0
    total_train = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.cuda(), labels.cuda()  # Move data to GPU

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Update running loss
        running_loss += loss.item()

        # Calculate training accuracy
        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    # Compute average loss and accuracy for this epoch
    avg_train_loss = running_loss / len(train_loader)
    train_accuracy = 100 * correct_train / total_train
    train_losses.append(avg_train_loss)
    train_accuracies.append(train_accuracy)

    # --- Validation Phase ---
    model.eval()  # Set model to evaluation mode
    running_val_loss = 0.0
    correct_val = 0
    total_val = 0

    with torch.no_grad():  # No need to compute gradients during validation
        for inputs, labels in test_loader:
            inputs, labels = inputs.cuda(), labels.cuda()  # Move data to GPU

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Update validation loss
            running_val_loss += loss.item()

            # Calculate validation accuracy
            _, predicted = torch.max(outputs, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    # Compute average validation loss and accuracy for this epoch
    avg_val_loss = running_val_loss / len(test_loader)
    val_accuracy = 100 * correct_val / total_val
    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)

    # Time for the epoch
    epoch_time = time.time() - start_time
    epoch_times.append(epoch_time)

    # Print training and validation results for the epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, "
          f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%, "
          f"Time: {epoch_time:.2f} sec")


In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import (
    confusion_matrix, ConfusionMatrixDisplay,
    precision_score, recall_score, f1_score,
    matthews_corrcoef, accuracy_score
)

# Testing the model
model.eval()
all_labels = []
all_predictions = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.cuda(), labels.cuda()  # Move to GPU
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays
all_labels = np.array(all_labels)
all_predictions = np.array(all_predictions)

# Compute accuracy
accuracy = 100 * np.sum(all_labels == all_predictions) / len(all_labels)
print(f"Test Accuracy: {accuracy:.2f}%")

# Compute Precision, Recall, F1 Score, and MCC
precision = precision_score(all_labels, all_predictions, average='weighted')  # Weighted by class support
recall = recall_score(all_labels, all_predictions, average='weighted')
f1 = f1_score(all_labels, all_predictions, average='weighted')
mcc = matthews_corrcoef(all_labels, all_predictions)

# Sensitivity (True Positive Rate or Recall for each class)
sensitivity = recall_score(all_labels, all_predictions, average=None)  # Per class

# Display the metrics
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Matthews Correlation Coefficient (MCC): {mcc:.4f}")
print(f"Sensitivity per class: {sensitivity}")

from sklearn.metrics import recall_score

# Sensitivity (overall recall)
sensitivity = recall_score(all_labels, all_predictions, average='micro')  # or 'macro' or 'weighted' depending on preference
print(f"Overall Sensitivity (micro): {sensitivity:.4f}")
sensitivity = recall_score(all_labels, all_predictions, average='macro')  # or 'macro' or 'weighted' depending on preference
print(f"Overall Sensitivity (macro): {sensitivity:.4f}")
sensitivity = recall_score(all_labels, all_predictions, average='weighted')  # or 'macro' or 'weighted' depending on preference
print(f"Overall Sensitivity (weighted): {sensitivity:.4f}")


# # Compute confusion matrix
# cm = confusion_matrix(all_labels, all_predictions)

# # Normalize the confusion matrix by row (i.e., by the number of samples in each true class)
# cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100  # Convert to percentage

# # Plot the normalized confusion matrix
# disp = ConfusionMatrixDisplay(confusion_matrix=cm_percentage, display_labels=range(cm.shape[0]))

# plt.figure(figsize=(10, 8))
# disp.plot(cmap=plt.cm.Blues)
# plt.title('Confusion Matrix (Percentage)')
# plt.ylabel('True label')
# plt.xlabel('Predicted label')

# To ensure percentages are displayed with 2 decimal points
# plt.gca().images[-1].colorbar.set_ticks(np.arange(0, 101, 10))  # Set colorbar ticks to percentage values
# plt.gca().images[-1].colorbar.set_label('Percentage (%)')

plt.show()

# Calculate Cross-Correlation Coefficient
def calculate_cross_correlation(labels, predictions):
    # Zero-mean both series
    labels = labels - np.mean(labels)
    predictions = predictions - np.mean(predictions)

    # Compute the cross-correlation coefficient (at lag 0)
    cross_corr = np.correlate(labels, predictions, mode='valid') / (np.std(labels) * np.std(predictions) * len(labels))

    return cross_corr[0]  # Return the cross-correlation coefficient at lag 0

cross_corr_coeff = calculate_cross_correlation(all_labels, all_predictions)
print(f"Cross-Correlation Coefficient (at lag 0): {cross_corr_coeff:.4f}")

# Plot loss per epoch
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss per Epoch')
plt.legend()

# Plot training accuracy per epoch
plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Training Accuracy', color='orange')
plt.plot(val_accuracies, label='Validation Accuracy', color='blue')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Accuracy per Epoch')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Assuming all_labels and all_predictions are numpy arrays containing the true and predicted labels

# Compute confusion matrix
cm = confusion_matrix(all_labels, all_predictions)

# Normalize the confusion matrix by row (i.e., by the number of samples in each true class)
cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100  # Convert to percentage

# Plot the normalized confusion matrix
fig, ax = plt.subplots(figsize=(10, 8))

# Use ConfusionMatrixDisplay to plot the heatmap
disp = ConfusionMatrixDisplay(confusion_matrix=cm_percentage, display_labels=range(cm.shape[0]))
disp.plot(cmap=plt.cm.Blues, ax=ax, values_format='.1f')

# Remove the default text labels as we will add custom annotations with the % symbol
for texts in disp.text_.ravel():  # disp.text_ contains the text annotations of the confusion matrix
    texts.remove()

# Add custom annotations with the percentage symbol in each box
for i in range(cm_percentage.shape[0]):
    for j in range(cm_percentage.shape[1]):
        plt.text(j, i, f'{cm_percentage[i, j]:.1f}%',
                 ha="center", va="center", color="black")

# Set title and labels
plt.title('Normalized Confusion Matrix (in %)')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')

# Customize the colorbar to display percentage values
cbar = ax.images[-1].colorbar
cbar.set_ticks(np.arange(0, 101, 10))  # Set colorbar ticks to percentage values
cbar.set_label('Percentage (%)')

# Show the plot
plt.show()
