In [None]:
#importing necessary libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm
import random
import gc
import cv2
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import transforms, models
import timm
from sklearn.metrics import f1_score, classification_report, confusion_matrix
from sklearn.model_selection import StratifiedKFold

# For data augmentation
import albumentations as A
from albumentations.pytorch import ToTensorV2

In [None]:
# replication seed
def set_seed(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

In [None]:
#device agnostic code
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# dataset file paths
TRAIN_DIR = '/kaggle/input/soil-classification/soil_classification-2025/train'
TEST_DIR = '/kaggle/input/soil-classification/soil_classification-2025/test'
TRAIN_CSV = '/kaggle/input/soil-classification/soil_classification-2025/train_labels.csv'
TEST_CSV = '/kaggle/input/soil-classification/soil_classification-2025/test_ids.csv'

In [None]:
# dataset exploration
train_df = pd.read_csv(TRAIN_CSV)
test_df = pd.read_csv(TEST_CSV)

In [None]:
print("Train data shape:", train_df.shape)
print("\nSample of training data:")
print(train_df.head())

# Check class distribution
print("\nClass distribution in training data:")
class_dist = train_df['soil_type'].value_counts()
print(class_dist)

In [None]:
# data preprocessing
soil_types = {
    'Alluvial soil': 0,
    'Black Soil': 1,
    'Clay soil': 2,
    'Red soil': 3
}
train_df['label'] = train_df['soil_type'].map(soil_types)

In [None]:
def check_image_sizes(df, img_dir):
    sizes = []
    sample_size = min(100, len(df))
    for i in range(sample_size):
        img_path = os.path.join(img_dir, df.iloc[i]['image_id'])
        img = Image.open(img_path)
        sizes.append(img.size)

    sizes_df = pd.DataFrame(sizes, columns=['width', 'height'])
    print("Image size statistics:")
    print(sizes_df.describe())
    return sizes_df

In [None]:
# data augmentation to ensure a diverse input data
def get_train_transforms(img_size=384):
    return A.Compose([
        A.Resize(img_size, img_size),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.Rotate(limit=30, p=0.7),
        A.RandomBrightnessContrast(p=0.7),
        A.HueSaturationValue(p=0.5),
        A.OneOf([
            A.GaussNoise(),
            A.GaussianBlur(),
            A.MotionBlur(),
        ], p=0.3),
        A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p=0.5),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

In [None]:
# applying data augmentation to validation data
def get_valid_transforms(img_size=384):
    return A.Compose([
        A.Resize(img_size, img_size),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

In [None]:
# setting up input data structure
class SoilDataset(Dataset):
    def __init__(self, df, img_dir, transform=None, test=False):
        self.df = df
        self.img_dir = img_dir
        self.transform = transform
        self.test = test

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.df.iloc[idx]['image_id'])
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.transform:
            augmented = self.transform(image=img)
            img = augmented['image']

        if not self.test:
            label = self.df.iloc[idx]['label']
            return img, label
        else:
            return img

In [None]:
# defining the model
class SoilClassifier(nn.Module):
    def __init__(self, model_name='efficientnet_b3', pretrained=True, num_classes=4):
        super(SoilClassifier, self).__init__()
        self.model = timm.create_model(model_name, pretrained=pretrained)

        # Get the number of features in the last layer
        if 'efficientnet' in model_name:
            n_features = self.model.classifier.in_features
            self.model.classifier = nn.Identity()
        else:  # For other models like ResNet
            n_features = self.model.fc.in_features
            self.model.fc = nn.Identity()

        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(n_features, num_classes)

    def forward(self, x):
        features = self.model(x)
        features = self.dropout(features)
        return self.classifier(features)

In [None]:
# traning loop
def train_epoch(model, dataloader, criterion, optimizer, scheduler, scaler, device):
    model.train()
    running_loss = 0.0
    all_targets = []
    all_predictions = []

    for images, targets in tqdm(dataloader, desc="Training"):
        images, targets = images.to(device), targets.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass with mixed precision
        with torch.cuda.amp.autocast():
            outputs = model(images)
            loss = criterion(outputs, targets)

        # Backpropagation with gradient scaling
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # Update scheduler
        if scheduler is not None:
            scheduler.step()

        # Track statistics
        running_loss += loss.item() * images.size(0)

        # Store predictions and targets for metrics
        _, preds = torch.max(outputs, 1)
        all_targets.extend(targets.cpu().numpy())
        all_predictions.extend(preds.cpu().numpy())

    # Calculate metrics
    epoch_loss = running_loss / len(dataloader.dataset)
    f1_scores = f1_score(all_targets, all_predictions, average=None)
    min_f1 = f1_scores.min()
    avg_f1 = f1_score(all_targets, all_predictions, average='macro')

    return epoch_loss, min_f1, avg_f1, f1_scores

In [None]:
# validation loop
def valid_epoch(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_targets = []
    all_predictions = []

    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Validation"):
            images, targets = images.to(device), targets.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, targets)

            # Track statistics
            running_loss += loss.item() * images.size(0)

            # Store predictions and targets for metrics
            _, preds = torch.max(outputs, 1)
            all_targets.extend(targets.cpu().numpy())
            all_predictions.extend(preds.cpu().numpy())

    # Calculate metrics
    epoch_loss = running_loss / len(dataloader.dataset)
    f1_scores = f1_score(all_targets, all_predictions, average=None)
    min_f1 = f1_scores.min()
    avg_f1 = f1_score(all_targets, all_predictions, average='macro')

    # Get per-class metrics
    class_report = classification_report(all_targets, all_predictions, target_names=list(soil_types.keys()), output_dict=True)

    # Create confusion matrix
    cm = confusion_matrix(all_targets, all_predictions)

    return epoch_loss, min_f1, avg_f1, f1_scores, class_report, cm

In [None]:
# Hyperparameter setup
CONFIG = {
    'IMG_SIZE': 384,
    'BATCH_SIZE': 16,
    'NUM_WORKERS': 4,
    'EPOCHS': 20,
    'LEARNING_RATE': 1e-4,
    'WEIGHT_DECAY': 1e-4,
    'MODEL_NAME': 'efficientnet_b3',
    'NUM_CLASSES': 4,
    'NUM_FOLDS': 5,
    'DEVICE': device,
    'EARLY_STOPPING': 5,  # Number of epochs to wait before early stopping
}

In [None]:
# defining k-fold cross validation for training
def train_with_kfold(df, train_dir, config=CONFIG):
    # Initialize KFold
    kfold = StratifiedKFold(n_splits=config['NUM_FOLDS'], shuffle=True, random_state=42)

    # Lists to store metrics across folds
    fold_min_f1_scores = []
    fold_models = []
    best_min_f1 = 0
    best_fold = 0

    # Initialize class weights for loss function to handle class imbalance
    class_counts = df['label'].value_counts().sort_index().values
    weights = torch.FloatTensor(len(class_counts) / class_counts).to(config['DEVICE'])

    # For each fold
    for fold, (train_idx, val_idx) in enumerate(kfold.split(df, df['label'])):
        print(f"\n{'='*20} Fold {fold+1}/{config['NUM_FOLDS']} {'='*20}")

        # Split the data
        train_data = df.iloc[train_idx].reset_index(drop=True)
        val_data = df.iloc[val_idx].reset_index(drop=True)

        print(f"Training on {len(train_data)} samples, validating on {len(val_data)} samples")

        # Create datasets
        train_dataset = SoilDataset(train_data, train_dir, transform=get_train_transforms(config['IMG_SIZE']))
        val_dataset = SoilDataset(val_data, train_dir, transform=get_valid_transforms(config['IMG_SIZE']))

        # Create dataloaders
        train_loader = DataLoader(
            train_dataset,
            batch_size=config['BATCH_SIZE'],
            shuffle=True,
            num_workers=config['NUM_WORKERS'],
            pin_memory=True
        )

        val_loader = DataLoader(
            val_dataset,
            batch_size=config['BATCH_SIZE'],
            shuffle=False,
            num_workers=config['NUM_WORKERS'],
            pin_memory=True
        )

        # Initialize model
        model = SoilClassifier(
            model_name=config['MODEL_NAME'],
            pretrained=True,
            num_classes=config['NUM_CLASSES']
        ).to(config['DEVICE'])

        # Initialize optimizers and schedulers
        optimizer = optim.AdamW(model.parameters(), lr=config['LEARNING_RATE'], weight_decay=config['WEIGHT_DECAY'])

        # Use weighted cross-entropy loss for class imbalance
        criterion = nn.CrossEntropyLoss(weight=weights)

        # Learning rate scheduler
        scheduler = optim.lr_scheduler.CosineAnnealingLR(
            optimizer,
            T_max=config['EPOCHS'] * len(train_loader),
            eta_min=1e-6
        )

        # Initialize gradient scaler for mixed precision training
        scaler = torch.cuda.amp.GradScaler()

        # Training loop
        best_val_min_f1 = 0
        no_improvement_count = 0
        fold_best_model = None

        for epoch in range(config['EPOCHS']):
            print(f"\nEpoch {epoch+1}/{config['EPOCHS']}")

            # Train
            train_loss, train_min_f1, train_avg_f1, train_class_f1 = train_epoch(
                model, train_loader, criterion, optimizer, scheduler, scaler, config['DEVICE']
            )

            # Validate
            val_loss, val_min_f1, val_avg_f1, val_class_f1, val_report, val_cm = valid_epoch(
                model, val_loader, criterion, config['DEVICE']
            )

            # Print metrics
            print(f"Train Loss: {train_loss:.4f}, Train Min F1: {train_min_f1:.4f}, Train Avg F1: {train_avg_f1:.4f}")
            print(f"Val Loss: {val_loss:.4f}, Val Min F1: {val_min_f1:.4f}, Val Avg F1: {val_avg_f1:.4f}")
            print("Class-wise F1 scores:")
            for i, soil_type in enumerate(soil_types.keys()):
                print(f"{soil_type}: {val_class_f1[i]:.4f}")

            # Check if this is the best model for this fold
            if val_min_f1 > best_val_min_f1:
                best_val_min_f1 = val_min_f1
                fold_best_model = model.state_dict().copy()
                no_improvement_count = 0
                print(f"New best model with min F1 score: {best_val_min_f1:.4f}")
            else:
                no_improvement_count += 1
                print(f"No improvement for {no_improvement_count} epochs")

            # Early stopping
            if no_improvement_count >= config['EARLY_STOPPING']:
                print(f"Early stopping triggered after {epoch+1} epochs")
                break

        # Store the best min F1 score for this fold
        fold_min_f1_scores.append(best_val_min_f1)

        # Create a new model instance and load the best weights
        best_model = SoilClassifier(
            model_name=config['MODEL_NAME'],
            pretrained=False,
            num_classes=config['NUM_CLASSES']
        ).to(config['DEVICE'])
        best_model.load_state_dict(fold_best_model)
        fold_models.append(best_model)

        # Check if this is the best fold overall
        if best_val_min_f1 > best_min_f1:
            best_min_f1 = best_val_min_f1
            best_fold = fold

        # Clear memory
        del model, train_dataset, val_dataset, train_loader, val_loader
        torch.cuda.empty_cache()
        gc.collect()

    # Print fold results
    print("\n" + "="*50)
    print("K-fold Cross-validation Results:")
    for fold, score in enumerate(fold_min_f1_scores):
        print(f"Fold {fold+1}: Min F1 = {score:.4f}")
    print(f"Average Min F1: {np.mean(fold_min_f1_scores):.4f}")
    print(f"Best fold: {best_fold+1} with Min F1 = {best_min_f1:.4f}")
    print("="*50)

    return fold_models, best_fold

In [None]:
# executing the training loop
models, best_fold = train_with_kfold(train_df, TRAIN_DIR)

In [None]:
# saves the best k-fold model
best_model = models[best_fold]
torch.save(best_model.state_dict(), 'best_soil_model.pth')
print(f"Best model (fold {best_fold+1}) saved to 'best_soil_model.pth'")

In [None]:
# inference code and submission.csv creation
def predict(models, test_df, test_dir, config=CONFIG):
    # Create test dataset
    test_dataset = SoilDataset(
        test_df,
        test_dir,
        transform=get_valid_transforms(config['IMG_SIZE']),
        test=True
    )

    # Create test dataloader
    test_loader = DataLoader(
        test_dataset,
        batch_size=config['BATCH_SIZE'],
        shuffle=False,
        num_workers=config['NUM_WORKERS'],
        pin_memory=True
    )

    # Dictionary to map indices back to soil types
    idx_to_soil = {v: k for k, v in soil_types.items()}

    # Make predictions
    all_predictions = []

    for model in models:
        model.eval()
        model_preds = []

        with torch.no_grad():
            for images in tqdm(test_loader, desc="Predicting"):
                images = images.to(config['DEVICE'])
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
                model_preds.extend(preds.cpu().numpy())

        all_predictions.append(model_preds)

    # Convert predictions to numpy array for easier manipulation
    all_predictions = np.array(all_predictions)

    # Take the mode of predictions from all models (ensemble)
    final_predictions = []
    for i in range(len(test_df)):
        # Get predictions from all models for this sample
        sample_preds = all_predictions[:, i]

        # Find the most common prediction (mode)
        values, counts = np.unique(sample_preds, return_counts=True)
        mode_idx = values[np.argmax(counts)]

        # Convert index to soil type
        final_predictions.append(idx_to_soil[mode_idx])

    # Create submission dataframe
    submission_df = test_df.copy()
    submission_df['soil_type'] = final_predictions

    return submission_df

In [None]:
# executing inference and submission.csv creation code
submission_df = predict(models, test_df, TEST_DIR)

# Save submission
submission_df[['image_id', 'soil_type']].to_csv('submission.csv', index=False)
print("Submission file created.")

In [None]:
# visualizing our predictions
def visualize_predictions(model, test_df, test_dir, num_samples=10, config=CONFIG):
    # Get a random sample of test images
    sample_indices = np.random.choice(len(test_df), num_samples, replace=False)
    sample_df = test_df.iloc[sample_indices].reset_index(drop=True)

    # Create simple transforms for visualization
    transform = A.Compose([
        A.Resize(config['IMG_SIZE'], config['IMG_SIZE']),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

    # Dictionary to map indices back to soil types
    idx_to_soil = {v: k for k, v in soil_types.items()}

    # Set up the figure
    fig, axes = plt.subplots(2, 5, figsize=(20, 8))
    axes = axes.flatten()

    # Model to evaluation mode
    model.eval()

    for i, idx in enumerate(sample_indices):
        # Load and preprocess the image
        img_path = os.path.join(test_dir, test_df.iloc[idx]['image_id'])
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # For display
        display_img = cv2.resize(img, (config['IMG_SIZE'], config['IMG_SIZE']))

        # For prediction
        transformed = transform(image=img)
        tensor_img = transformed['image'].unsqueeze(0).to(config['DEVICE'])

        # Make prediction
        with torch.no_grad():
            output = model(tensor_img)
            probabilities = torch.nn.functional.softmax(output, dim=1)
            max_prob, prediction = torch.max(probabilities, 1)

        # Get predicted soil type
        pred_soil = idx_to_soil[prediction.item()]
        confidence = max_prob.item() * 100

        # Display image and prediction
        axes[i].imshow(display_img)
        axes[i].set_title(f"Pred: {pred_soil}\nConf: {confidence:.1f}%")
        axes[i].axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
# executing the visualize function
visualize_predictions(best_model, test_df, TEST_DIR)