In [2]:
%pip install torchcam

Defaulting to user installation because normal site-packages is not writeable
Collecting torchcam
  Using cached torchcam-0.4.0-py3-none-any.whl.metadata (31 kB)
Using cached torchcam-0.4.0-py3-none-any.whl (46 kB)
Installing collected packages: torchcam
Successfully installed torchcam-0.4.0
Note: you may need to restart the kernel to use updated packages.


In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import transforms, datasets
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
from PIL import Image
from torchcam.methods import GradCAM
from torchcam.utils import overlay_mask
import onnx
import subprocess
import tensorflow as tf
from tqdm import tqdm  # For progress bar
from torch.utils.tensorboard import SummaryWriter  # For TensorBoard logging

# Ensure reproducibility
torch.manual_seed(42)
np.random.seed(42)
import random
random.seed(42)

# -----------------------------
# 1. Define the Modified U-Net Model for Classification
# -----------------------------

class DoubleConv(nn.Module):
    """Double convolution layer with BatchNorm and ReLU."""
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),

            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )
    
    def forward(self, x):
        return self.double_conv(x)

class UNetClassifier(nn.Module):
    """U-Net architecture adapted for classification."""
    def __init__(self, in_channels=3, num_classes=3):
        super(UNetClassifier, self).__init__()
        
        # Encoder
        self.enc1 = DoubleConv(in_channels, 64)
        self.pool1 = nn.MaxPool2d(2)
        
        self.enc2 = DoubleConv(64, 128)
        self.pool2 = nn.MaxPool2d(2)
        
        self.enc3 = DoubleConv(128, 256)
        self.pool3 = nn.MaxPool2d(2)
        
        self.enc4 = DoubleConv(256, 512)
        self.pool4 = nn.MaxPool2d(2)
        
        self.bottleneck = DoubleConv(512, 1024)
        
        # Classification Head
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1,1)),
            nn.Flatten(),
            nn.Linear(1024, num_classes)
        )
    
    def forward(self, x):
        # Encoder pathway
        x1 = self.enc1(x)
        p1 = self.pool1(x1)
        
        x2 = self.enc2(p1)
        p2 = self.pool2(x2)
        
        x3 = self.enc3(p2)
        p3 = self.pool3(x3)
        
        x4 = self.enc4(p3)
        p4 = self.pool4(x4)
        
        bottleneck = self.bottleneck(p4)
        
        # Classification
        out = self.classifier(bottleneck)
        
        return out

# -----------------------------
# 2. Data Loading and Preprocessing
# -----------------------------

def get_image_dataloader(image_folder, batch_size=16, valid_split=0.2, num_workers=4):
    """
    Creates training and validation DataLoaders using ImageFolder.
    
    Args:
        image_folder (str): Path to the dataset folder organized by class subdirectories.
        batch_size (int): Batch size for DataLoaders.
        valid_split (float): Fraction of data to be used for validation.
        num_workers (int): Number of subprocesses for data loading.
    
    Returns:
        train_loader, val_loader, class_names
    """
    # Define transformations
    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # Resize images to 224x224
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet normalization
                             std=[0.229, 0.224, 0.225]),
    ])
    
    # Load dataset using ImageFolder
    dataset = datasets.ImageFolder(root=image_folder, transform=transform)
    class_names = dataset.classes
    num_classes = len(class_names)
    print(f"Number of classes: {num_classes}")
    print(f"Classes: {class_names}")
    
    # Split dataset into training and validation sets
    train_size = int((1 - valid_split) * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    
    print(f"Number of training samples: {len(train_dataset)}")
    print(f"Number of validation samples: {len(val_dataset)}")
    
    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    
    return train_loader, val_loader, class_names

# -----------------------------
# 3. Define Visualization Functions
# -----------------------------

def plot_training_history(train_losses, val_losses, save_path='training_history.png'):
    """Plot and save training and validation loss curves."""
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Training Loss', color='blue')
    plt.plot(val_losses, label='Validation Loss', color='orange')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.savefig(save_path)
    plt.close()
    print(f"Saved {save_path}")

def plot_confusion_matrix(model, val_loader, device, class_names, save_path='confusion_matrix.png'):
    """Generate and save confusion matrix and classification report."""
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.savefig(save_path)
    plt.close()
    print(f"Saved {save_path}")
    
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=class_names))

def visualize_predictions(model, val_loader, device, class_names, cam_extractor, num_samples=5, save_path='prediction_visualization.png'):
    """Visualize predictions with CAM overlays."""
    model.eval()
    fig, axes = plt.subplots(num_samples, 2, figsize=(10, 5*num_samples))
    
    for i in range(num_samples):
        # Randomly select a batch and an image within the batch
        inputs, labels = next(iter(val_loader))
        idx = random.randint(0, inputs.size(0)-1)
        input_image = inputs[idx].to(device)
        label = labels[idx].item()
        
        with torch.no_grad():
            output = model(input_image.unsqueeze(0))
            pred_class = output.argmax(dim=1).item()
        
        # Get CAM
        activation_map = cam_extractor(pred_class, output)
        heatmap = overlay_mask(
            image=np.uint8(255 * (input_image.cpu().permute(1, 2, 0).numpy() * np.array([0.229, 0.224, 0.225]) + 
                                      np.array([0.485, 0.456, 0.406]))),  # Denormalize
            mask=activation_map.cpu().numpy(),
            alpha=0.5
        )
        
        # Prepare original image
        input_np = input_image.cpu().permute(1, 2, 0).numpy()
        # Denormalize
        input_np = input_np * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
        input_np = np.clip(input_np, 0, 1)
        
        # Plot original image
        axes[i, 0].imshow(input_np)
        axes[i, 0].set_title(f'Input Image\nTrue: {class_names[label]}')
        axes[i, 0].axis('off')
        
        # Plot CAM overlay
        axes[i, 1].imshow(heatmap)
        axes[i, 1].set_title(f'Predicted: {class_names[pred_class]}')
        axes[i, 1].axis('off')
    
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()
    print(f"Saved {save_path}")

def generate_cam_visualization(model, val_loader, device, class_names, cam_extractor, num_samples=5, save_path='cam_visualization.png'):
    """Generate and save CAM visualizations."""
    model.eval()
    fig, axes = plt.subplots(num_samples, 1, figsize=(5, 5*num_samples))
    
    for i in range(num_samples):
        # Randomly select a batch and an image within the batch
        inputs, labels = next(iter(val_loader))
        idx = random.randint(0, inputs.size(0)-1)
        input_image = inputs[idx].to(device)
        label = labels[idx].item()
        
        with torch.no_grad():
            output = model(input_image.unsqueeze(0))
            pred_class = output.argmax(dim=1).item()
        
        # Get CAM
        activation_map = cam_extractor(pred_class, output)
        heatmap = overlay_mask(
            image=np.uint8(255 * (input_image.cpu().permute(1, 2, 0).numpy() * np.array([0.229, 0.224, 0.225]) + 
                                      np.array([0.485, 0.456, 0.406]))),  # Denormalize
            mask=activation_map.cpu().numpy(),
            alpha=0.5
        )
        
        # Prepare original image
        input_np = input_image.cpu().permute(1, 2, 0).numpy()
        # Denormalize
        input_np = input_np * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
        input_np = np.clip(input_np, 0, 1)
        
        # Plot CAM overlay
        axes[i].imshow(heatmap)
        axes[i].set_title(f'Class Activation Map for {class_names[pred_class]}')
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()
    print(f"Saved {save_path}")

def generate_all_visualizations(model, train_losses, val_losses, val_loader, device, class_names):
    """Generate all required visualizations."""
    print("Generating training history plot...")
    plot_training_history(train_losses, val_losses, save_path='training_history.png')
    
    print("Generating confusion matrix and classification report...")
    plot_confusion_matrix(model, val_loader, device, class_names, save_path='confusion_matrix.png')
    
    # Initialize GradCAM extractor after generating confusion matrix
    target_layer = 'enc4.double_conv.1'  # Adjust based on model architecture
    cam_extractor = GradCAM(model, target_layer=target_layer)
    
    print("Generating prediction visualizations with CAM...")
    visualize_predictions(model, val_loader, device, class_names, cam_extractor, num_samples=5, save_path='prediction_visualization.png')
    
    print("Generating CAM visualizations...")
    generate_cam_visualization(model, val_loader, device, class_names, cam_extractor, num_samples=5, save_path='cam_visualization.png')

# -----------------------------
# 4. Training Function with Progress Bar
# -----------------------------

def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=25, accumulation_steps=4):
    """
    Train the model and track losses.
    
    Args:
        model (nn.Module): The neural network model.
        train_loader (DataLoader): DataLoader for training data.
        val_loader (DataLoader): DataLoader for validation data.
        criterion (loss): Loss function.
        optimizer (optimizer): Optimizer.
        device (torch.device): Device to train on.
        num_epochs (int): Number of training epochs.
        accumulation_steps (int): Gradient accumulation steps.
    
    Returns:
        train_losses, val_losses
    """
    train_losses = []
    val_losses = []
    
    # Initialize TensorBoard writer
    writer = SummaryWriter()  # Logs will be saved to runs/ directory
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")
        
        for i, (inputs, labels) in progress_bar:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            
            outputs = model(inputs)
            loss = criterion(outputs, labels) / accumulation_steps
            loss.backward()
            
            if (i + 1) % accumulation_steps == 0:
                optimizer.step()
                optimizer.zero_grad()
            
            running_loss += loss.item() * accumulation_steps  # Multiply back
            
            # Update progress bar
            avg_loss = running_loss / ((i + 1) * train_loader.batch_size)
            progress_bar.set_postfix(loss=avg_loss)
            
            # Log to TensorBoard
            writer.add_scalar('Training Loss', avg_loss, epoch * len(train_loader) + i)
        
        # Handle remaining gradients
        if (i + 1) % accumulation_steps != 0:
            optimizer.step()
            optimizer.zero_grad()
        
        epoch_train_loss = running_loss / len(train_loader.dataset)
        train_losses.append(epoch_train_loss)
        
        # Validation phase
        model.eval()
        val_running_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_running_loss += loss.item() * inputs.size(0)
        
        epoch_val_loss = val_running_loss / len(val_loader.dataset)
        val_losses.append(epoch_val_loss)
        
        print(f'Epoch {epoch+1}/{num_epochs} | Train Loss: {epoch_train_loss:.4f} | Val Loss: {epoch_val_loss:.4f}')
        
        # Log validation loss to TensorBoard
        writer.add_scalar('Validation Loss', epoch_val_loss, epoch)
        
        # Clear cache every 5 epochs to manage GPU memory
        if (epoch + 1) % 5 == 0:
            torch.cuda.empty_cache()
    
    # Close TensorBoard writer
    writer.close()
    
    # Generate all visualizations
    generate_all_visualizations(model, train_losses, val_losses, val_loader, device, class_names)
    
    return train_losses, val_losses

# -----------------------------
# 5. Main Function
# -----------------------------

def main():
    # Specify the path to your dataset
    image_folder = "/home/idrone2/Tea_pest/Tea_TJ"  # <-- Update this path accordingly
    
    # Verify the dataset directory exists
    if not os.path.isdir(image_folder):
        raise ValueError(f"The specified directory does not exist: {image_folder}")
    
    # Define valid image extensions
    valid_extensions = (".png", ".jpg", ".jpeg", ".bmp", ".gif", ".tiff")
    
    # List all files in the directory
    try:
        all_files = os.listdir(image_folder)
    except Exception as e:
        raise ValueError(f"Error accessing the directory {image_folder}: {e}")
    
    # Filter files with valid extensions (case-insensitive)
    image_paths = [os.path.join(image_folder, f) for f in all_files 
                   if f.lower().endswith(valid_extensions)]
    
    # Debugging: Print the number of images found and some sample filenames
    print(f"Number of images found: {len(image_paths)}")
    if len(image_paths) == 0:
        print("Files in the directory:")
        for f in all_files:
            print(f)
        raise ValueError("No images found in the specified directory. Please check the path and file extensions.")
    
    # Optionally, print first 5 image paths
    print("Sample image paths:")
    for path in image_paths[:5]:
        print(path)
    
    # Create DataLoaders
    train_loader, val_loader, class_names = get_image_dataloader(image_folder=image_folder, batch_size=16, valid_split=0.2, num_workers=4)
    num_classes = len(class_names)
    
    # Initialize model, loss, and optimizer
    model = UNetClassifier(in_channels=3, num_classes=num_classes).to(device)
    print(model)
    
    # Identify and print all layer names to help select the correct target layer for GradCAM
    print("\nModel Layers:")
    for name, module in model.named_modules():
        print(name)
    
    # Set the target layer for GradCAM
    # Example: 'enc4.double_conv.1'
    target_layer = 'enc4.double_conv.1'  # <-- Adjust based on printed layer names
    cam_extractor = GradCAM(model, target_layer=target_layer)
    
    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Train the model
    num_epochs = 25
    train_losses, val_losses = train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=num_epochs, accumulation_steps=4)
    
    # Save the model in PyTorch format (.pth)
    torch.save(model.state_dict(), 'unet_classifier.pth')
    print("Model saved as unet_classifier.pth")
    
    # Export the model to ONNX format (.onnx)
    dummy_input = torch.randn(1, 3, 224, 224, device=device)
    onnx_model_path = "unet_classifier.onnx"
    torch.onnx.export(model, dummy_input, onnx_model_path, 
                      input_names=['input'], output_names=['output'], 
                      dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}},
                      opset_version=11)
    print(f"Model exported to ONNX format at {onnx_model_path}")
    
    # Convert ONNX model to TensorFlow SavedModel
    tf_model_path = "unet_classifier_tf"
    if not os.path.exists(tf_model_path):
        os.makedirs(tf_model_path)
    
    command = f"python -m tf2onnx.convert --onnx {onnx_model_path} --saved-model {tf_model_path}"
    print(f"Converting ONNX to TensorFlow SavedModel with command: {command}")
    result = subprocess.run(command, shell=True)
    
    if result.returncode == 0:
        print(f"Model converted to TensorFlow SavedModel format at {tf_model_path}")
    else:
        print("Failed to convert ONNX to TensorFlow SavedModel.")
    
    # Convert TensorFlow SavedModel to TensorFlow Lite (.tflite)
    try:
        converter = tf.lite.TFLiteConverter.from_saved_model(tf_model_path)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        tflite_model = converter.convert()
        
        # Save the TFLite model
        with open("unet_classifier.tflite", "wb") as f:
            f.write(tflite_model)
        print("Model converted to TensorFlow Lite format at unet_classifier.tflite")
    except Exception as e:
        print(f"Failed to convert TensorFlow SavedModel to TFLite: {e}")
    
    print("All tasks completed successfully.")

if __name__ == "__main__":
    main()


2024-10-16 18:02:37.363375: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-10-16 18:02:37.370712: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-10-16 18:02:37.379124: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-10-16 18:02:37.381615: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-10-16 18:02:37.388205: I tensorflow/core/platform/cpu_feature_guar

ValueError: The specified directory does not exist: /home/idrone2/Tea_pest/Tea_TJ/jassid