In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/testdataset-misahub/Testing set/Images/vaPn7c5Xg6.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/bblyH80cLP.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/RMIodDhaCb.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/tyA53RVSvG.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/5cZ5EKCwD0.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/mTpiQFLYlb.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/kG8rqId8Cn.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/NxzSIITJNr.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/FCdUK6fqIW.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/I0OIIJ42a2.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/9MjZVAh6P8.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/dGi9A2TaNB.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/hht51RKGKd.jpg
/kaggle/input/testdataset-misahub/Testing set/Images/YrJp9uoyNS.jpg
/kaggle/input/testdataset-misahub/Testing set/Im

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms
from PIL import Image
import os
import random
import numpy as np
from sklearn.model_selection import train_test_split
from transformers import ViTImageProcessor, ViTForImageClassification
from tqdm import tqdm
import logging
from sklearn.metrics import balanced_accuracy_score
import pandas as pd

In [None]:
import os
import logging
import random
import torch
import numpy as np
from collections import defaultdict
import cv2
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from transformers import ViTImageProcessor, ViTForImageClassification
import albumentations as A
from albumentations.pytorch import ToTensorV2

In [None]:
from transformers import get_scheduler


In [None]:
import logging

logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s'
)


In [None]:
# Set random seed for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

In [None]:
# Define the dataset class
class VCEDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None, label_map=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
        self.label_map = label_map

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        try:
            image = Image.open(self.image_paths[idx]).convert('RGB')
            label = self.labels[idx]
            
            if self.transform:
                image = self.transform(image)
            
            if self.label_map:
                label = self.label_map[label]
            
            return image, label
        except Exception as e:
            logging.error(f"Error loading image {self.image_paths[idx]}: {str(e)}")
            return None

In [None]:
# Set up the model and data
def setup_model_and_data(train_dir, val_dir=None, batch_size=32, target_samples=1000):
    model_name = "google/vit-base-patch16-224-in21k"
    processor = ViTImageProcessor.from_pretrained(model_name)

    def load_data_from_dir(data_dir, augment=False):
        image_paths = []
        labels = []
        class_names = sorted(os.listdir(data_dir))
        logging.info(f"Found {len(class_names)} classes: {class_names}")

        label_map = {class_name: idx for idx, class_name in enumerate(class_names)}
        inv_label_map = {v: k for k, v in label_map.items()}

        class_counts = []

        for class_name in class_names:
            class_dir = os.path.join(data_dir, class_name)
            if not os.path.isdir(class_dir):
                logging.warning(f"Skipping {class_dir} as it's not a directory")
                continue
            
            class_images = []
            for subdir in os.listdir(class_dir):
                subdir_path = os.path.join(class_dir, subdir)
                if not os.path.isdir(subdir_path):
                    continue
                
                subdir_images = [os.path.join(subdir_path, img) for img in os.listdir(subdir_path) 
                                 if img.lower().endswith(('.png', '.jpg', '.jpeg'))]
                class_images.extend(subdir_images)
            
            class_counts.append(len(class_images))
            image_paths.extend(class_images)
            labels.extend([label_map[class_name]] * len(class_images))

            logging.info(f"Found {len(class_images)} images for class {class_name}")

        return image_paths, labels, class_names, label_map, inv_label_map, class_counts

    train_paths, train_labels, class_names, label_map, inv_label_map, class_counts = load_data_from_dir(train_dir)

    if val_dir:
        val_paths, val_labels, _, _, _, _ = load_data_from_dir(val_dir)
    else:
        train_paths, val_paths, train_labels, val_labels = train_test_split(
            train_paths, train_labels, test_size=0.2, stratify=train_labels, random_state=42
        )

    logging.info(f"Training images before augmentation: {len(train_paths)}")
    logging.info(f"Validation images: {len(val_paths)}")

    # Calculate class weights for all classes
    num_classes = len(class_names)
    class_counts = np.bincount(train_labels, minlength=num_classes)
    class_weights = 1. / np.clip(class_counts, 1, None)  # Avoid division by zero
    class_weights = class_weights / np.sum(class_weights) * num_classes  # Normalize weights
    sample_weights = class_weights[train_labels]
    sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(train_labels), replacement=True)

    # Define augmentations
    train_transform = A.Compose([
        A.RandomResizedCrop(height=224, width=224),
        A.HorizontalFlip(p=0.5),
        A.RandomBrightnessContrast(p=0.2),
        A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.1, rotate_limit=45, p=0.5),
        A.OneOf([
            A.GaussNoise(),
            A.GaussianBlur(),
            A.MotionBlur(),
        ], p=0.3),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

    val_transform = A.Compose([
        A.Resize(height=224, width=224),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

    class AugmentedDataset(Dataset):
        def __init__(self, image_paths, labels, transform, target_samples):
            self.image_paths = image_paths
            self.labels = labels
            self.transform = transform
            self.target_samples = target_samples

            # Create class indices dictionary
            self.class_indices = defaultdict(list)
            for idx, label in enumerate(self.labels):
                self.class_indices[label].append(idx)

            self.num_classes = len(self.class_indices)

            # Calculate samples per class to ensure balanced distribution
            min_samples = min(len(indices) for indices in self.class_indices.values())
            self.samples_per_class = max(
                min_samples,
                self.target_samples // self.num_classes
            )

            # Create stronger augmentations for underrepresented classes
            self.strong_transform = A.Compose([
                A.RandomResizedCrop(height=224, width=224, scale=(0.5, 1.0)),
                A.HorizontalFlip(p=0.7),
                A.VerticalFlip(p=0.3),
                A.RandomBrightnessContrast(p=0.5, brightness_limit=0.3, contrast_limit=0.3),
                A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=45, p=0.7),
                A.OneOf([
                    A.GaussNoise(var_limit=(10.0, 50.0)),
                    A.GaussianBlur(blur_limit=(3, 7)),
                    A.MotionBlur(blur_limit=(3, 7)),
                ], p=0.5),
                A.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1, p=0.5),
                A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                ToTensorV2(),
            ])

        def __len__(self):
            return self.target_samples

        def __getitem__(self, idx):
            class_idx = idx % self.num_classes
            sample_idx = random.choice(self.class_indices[class_idx])

            image = cv2.imread(self.image_paths[sample_idx])
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # Use stronger augmentations for underrepresented classes
            class_size = len(self.class_indices[class_idx])
            if class_size < self.samples_per_class // 2:
                augmented = self.strong_transform(image=image)
            else:
                augmented = self.transform(image=image)

            image = augmented['image']
            return image, self.labels[sample_idx]

    # Create datasets
    train_dataset = AugmentedDataset(train_paths, train_labels, train_transform, target_samples=target_samples)
    val_dataset = AugmentedDataset(val_paths, val_labels, val_transform, target_samples=len(val_paths))

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

    # Initialize the model
    num_classes = len(class_names)
    model = ViTForImageClassification.from_pretrained(model_name, num_labels=num_classes)

    return model, processor, train_loader, val_loader, class_names, inv_label_map, class_weights


In [None]:
from sklearn.metrics import confusion_matrix

In [None]:
def calculate_metrics(true_labels, pred_labels, num_classes):
    # Convert to numpy arrays if they're not already
    true_labels = np.array(true_labels)
    pred_labels = np.array(pred_labels)
    
    # Ensure predictions only include valid classes
    pred_labels = np.clip(pred_labels, 0, num_classes - 1)
    
    # Calculate metrics
    balanced_acc = balanced_accuracy_score(true_labels, pred_labels)
    conf_matrix = confusion_matrix(true_labels, pred_labels)
    class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)
    
    return balanced_acc * 100, class_acc * 100

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2, reduction='mean'):
        """
        Focal Loss for imbalanced classification tasks.
        Parameters:
        - alpha: Class weights to address class imbalance (tensor or list).
        - gamma: Focusing parameter to down-weight easy examples.
        - reduction: Specifies the reduction to apply to the output ('none', 'mean', 'sum').
        """
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        log_probs = F.log_softmax(inputs, dim=-1)
        probs = torch.exp(log_probs)
        focal_loss = -((1 - probs) ** self.gamma) * log_probs

        if self.alpha is not None:
            if isinstance(self.alpha, (list, torch.Tensor)):
                alpha = torch.tensor(self.alpha).to(inputs.device)
                focal_loss = focal_loss * alpha[targets].unsqueeze(1)

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss


In [None]:
def train(model, train_loader, val_loader, class_weights, num_epochs, lr, weight_decay):
    # Select device: GPU if available, otherwise CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Move the model to the appropriate device
    model = model.to(device)

    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)

    # Define the scheduler
    total_steps = len(train_loader) * num_epochs
    scheduler = get_scheduler(
        "linear", optimizer=optimizer, 
        num_warmup_steps=int(0.1 * total_steps), 
        num_training_steps=total_steps
    )

    # Define the criterion (loss function)
    class_weights_tensor = torch.FloatTensor(class_weights).to(device)
    criterion = torch.nn.CrossEntropyLoss(weight=class_weights_tensor)

    best_val_accuracy = 0.0
    patience_counter = 0

    for epoch in range(num_epochs):
        model.train()  # Set the model to training mode

        for inputs, labels in train_loader:
            # Move inputs and labels to the same device as the model
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs).logits  # Use .logits for ViT output
            loss = criterion(outputs, labels)

            # Backward pass and optimization step
            loss.backward()
            optimizer.step()

        # Update the scheduler
        scheduler.step()

    return model


In [None]:
def generate_test_predictions(model, test_loader, device, class_names, test_image_paths):
    model.eval()
    all_preds = []
    all_probs = []
    with torch.no_grad():
        for inputs, _ in tqdm(test_loader, desc="Generating predictions"):
            inputs = inputs.to(device)
            outputs = model(inputs).logits
            probs = torch.softmax(outputs, dim=1)
            _, preds = torch.max(probs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())
    
    results = []
    for img_path, probs, pred in zip(test_image_paths, all_probs, all_preds):
        row = [img_path] + list(probs) + [class_names[pred]]
        results.append(row)
    
    columns = ['image_path'] + class_names + ['predicted_class']
    df = pd.DataFrame(results, columns=columns)
    
    # Replace 'Unknown' with the class having maximum probability
    df.loc[df['predicted_class'] == 'Unknown', 'predicted_class'] = df.loc[:, class_names].idxmax(axis=1)
    
    return df

In [None]:
def main():
    train_dir = "/kaggle/input/misahubdataset/Dataset/training"
    val_dir = "/kaggle/input/misahubdataset/Dataset/validation"
    test_dir = "/kaggle/input/testdataset-misahub/Testing set/Images"
    batch_size = 64
    num_epochs = 30
    learning_rate = 2e-5
    weight_decay = 1e-5
    target_samples = 10000  # Set this to your desired total number of training samples

    try:
        model, processor, train_loader, val_loader, class_names, inv_label_map, class_weights = setup_model_and_data(train_dir, val_dir, batch_size, target_samples)
        
        # Train the model
        best_model = train(model, train_loader, val_loader, class_weights, num_epochs, learning_rate, weight_decay)


        # Load the best model
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        best_model = ViTForImageClassification.from_pretrained("google/vit-base-patch16-224-in21k", num_labels=len(class_names))
        best_model.load_state_dict(torch.load('best_model.pth'))
        best_model.to(device)

        # Setup test data
        test_image_paths = [os.path.join(test_dir, img) for img in os.listdir(test_dir) if img.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        transform_test = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        
        test_dataset = VCEDataset(test_image_paths, [0] * len(test_image_paths), transform=transform_test)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

        # Generate predictions
        df_predictions = generate_test_predictions(best_model, test_loader, device, class_names, test_image_paths)

        # Save predictions to CSV
        output_path = 'test_predictions.csv'
        df_predictions.to_csv(output_path, index=False)
        print(f"Predictions saved to {output_path}")

    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")
        raise

In [None]:
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

In [None]:
if __name__ == "__main__":
    main()