In [None]:
import os
import gc
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, classification_report, confusion_matrix
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import timm
import albumentations as A
from albumentations.pytorch import ToTensorV2

warnings.filterwarnings('ignore')
plt.style.use('ggplot')

print("✓ All libraries imported successfully")

In [None]:
class Config:

    # Dataset Paths
    BASE_PATH = '/kaggle/input/grain-classification'
    TRAIN_DIR = f'{BASE_PATH}/train-2/train-2/images'
    TEST_DIR = f'{BASE_PATH}/test-2/test-2/images'
    TRAIN_CSV = f'{BASE_PATH}/train-2/train-2/train.csv'
    TEST_CSV = f'{BASE_PATH}/test-2/test-2/test.csv'

    # Model Architecture
    MODELS = [
        'tf_efficientnetv2_m.in21k_ft_in1k',
        'convnext_base.fb_in22k_ft_in1k'
    ]

    # Training Hyperparameters
    IMG_SIZE = 384
    BATCH_SIZE = 32
    N_FOLDS = 5
    EPOCHS = 20
    LEARNING_RATE = 1e-4
    WEIGHT_DECAY = 1e-5
    ACCUMULATION_STEPS = 1
    LABEL_SMOOTHING = 0.1

    # System Configuration
    NUM_WORKERS = 2
    RANDOM_SEED = 42

    # Class Information
    CLASS_NAMES = ['barley', 'flax', 'oats', 'wheat']
    NUM_CLASSES = len(CLASS_NAMES)
    CLASS_TO_IDX = {name: idx for idx, name in enumerate(CLASS_NAMES)}
    IDX_TO_CLASS = {idx: name for name, idx in CLASS_TO_IDX.items()}

# Display configuration
print("Configuration Summary:")
print(f"  Models: {len(Config.MODELS)}")
print(f"  Classes: {Config.NUM_CLASSES}")
print(f"  Image Size: {Config.IMG_SIZE}×{Config.IMG_SIZE}")
print(f"  Batch Size: {Config.BATCH_SIZE}")
print(f"  Cross-Validation: {Config.N_FOLDS}-fold")
print(f"  Epochs: {Config.EPOCHS}")

In [None]:
def set_seed(seed=42):
    """Set random seed for reproducibility."""
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def clear_memory():
    """Clear GPU memory cache."""
    gc.collect()
    torch.cuda.empty_cache()

def get_device():
    """Get available device and display information."""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    if torch.cuda.is_available():
        print(f"Device: {device}")
        print(f"GPU: {torch.cuda.get_device_name(0)}")
        print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    else:
        print("Device: CPU (GPU not available)")

    return device

# Initialize
set_seed(Config.RANDOM_SEED)
device = get_device()

---
## 3. Data Loading & Preprocessing <a id='data'></a>

### 3.1 Load Data

In [None]:
def load_dataset():
    """Load and prepare training and test datasets."""

    # Load CSVs
    train_df = pd.read_csv(Config.TRAIN_CSV)
    test_df = pd.read_csv(Config.TEST_CSV)

    # Add full paths
    train_df['path'] = train_df['img'].apply(lambda x: os.path.join(Config.TRAIN_DIR, x))
    test_df['path'] = test_df['img'].apply(lambda x: os.path.join(Config.TEST_DIR, x))

    # Encode labels for training data
    train_df['label'] = train_df['class'].map(Config.CLASS_TO_IDX)

    # Display statistics
    print("Dataset Statistics:")
    print(f"  Training samples: {len(train_df)}")
    print(f"  Test samples: {len(test_df)}")
    print(f"\nClass distribution (training):")
    print(train_df['class'].value_counts())

    return train_df, test_df

train_df, test_df = load_dataset()

In [None]:
def get_transforms(img_size=384, augment=True):

    if augment:
        return A.Compose([
            # Geometric transformations
            A.Resize(img_size, img_size),
            A.RandomRotate90(p=0.5),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.ShiftScaleRotate(
                shift_limit=0.0625,
                scale_limit=0.15,
                rotate_limit=45,
                p=0.5
            ),

            # Blur augmentations
            A.OneOf([
                A.MotionBlur(p=0.2),
                A.MedianBlur(blur_limit=3, p=0.1),
                A.GaussianBlur(p=0.2),
            ], p=0.3),

            # Distortion
            A.OneOf([
                A.OpticalDistortion(p=0.3),
                A.GridDistortion(p=0.3),
            ], p=0.3),

            # Color augmentations
            A.OneOf([
                A.HueSaturationValue(
                    hue_shift_limit=20,
                    sat_shift_limit=30,
                    val_shift_limit=20,
                    p=0.3
                ),
                A.RandomBrightnessContrast(
                    brightness_limit=0.2,
                    contrast_limit=0.2,
                    p=0.3
                ),
            ], p=0.5),

            # Cutout
            A.CoarseDropout(
                max_holes=8,
                max_height=32,
                max_width=32,
                p=0.3
            ),

            # Normalization
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ToTensorV2()
        ])
    else:
        # Validation/Test transforms (no augmentation)
        return A.Compose([
            A.Resize(img_size, img_size),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ToTensorV2()
        ])

print("✓ Augmentation transforms defined")

### 3.3 Dataset Class

In [None]:
class GrainDataset(Dataset):

    def __init__(self, dataframe, transform=None):
        self.df = dataframe.reset_index(drop=True)
        self.transform = transform
        self.has_labels = 'label' in dataframe.columns

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = cv2.imread(row['path'])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transform:
            image = self.transform(image=image)['image']

        if self.has_labels:
            label = torch.tensor(row['label'], dtype=torch.long)
            return image, label
        else:
            return image

print("✓ Dataset class defined")

### 3.4 Create Cross-Validation Folds

In [None]:
def create_folds(df, n_folds=5):
    df = df.copy()
    df['fold'] = -1

    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=Config.RANDOM_SEED)

    for fold, (_, valid_idx) in enumerate(skf.split(df, df['label'])):
        df.loc[valid_idx, 'fold'] = fold

    print(f"Created {n_folds}-fold stratified cross-validation")
    print(f"\nFold distribution:")
    print(df.groupby(['fold', 'class']).size().unstack(fill_value=0))

    return df

train_df = create_folds(train_df, Config.N_FOLDS)

---
## 4. Model Architecture <a id='model'></a>

In [None]:
class GrainClassifier(nn.Module):

    def __init__(self, model_name, num_classes=4, pretrained=True):
        super().__init__()

        # Load pre-trained backbone (without classifier)
        self.backbone = timm.create_model(
            model_name,
            pretrained=pretrained,
            num_classes=0  # Remove original classifier
        )

        n_features = self.backbone.num_features

        # Custom classification head
        self.classifier = nn.Sequential(
            nn.Linear(n_features, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        features = self.backbone(x)
        output = self.classifier(features)
        return output

print("✓ Model architecture defined")

In [None]:
def train_one_epoch(model, loader, criterion, optimizer, scaler, device):
    model.train()
    running_loss = 0.0
    all_predictions = []
    all_labels = []

    pbar = tqdm(loader, desc='Training')
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()

        with torch.cuda.amp.autocast():
            outputs = model(images)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * images.size(0)
        predictions = outputs.argmax(dim=1).cpu().numpy()
        all_predictions.extend(predictions)
        all_labels.extend(labels.cpu().numpy())

        pbar.set_postfix({'loss': loss.item()})

    epoch_loss = running_loss / len(loader.dataset)
    epoch_f1 = f1_score(all_labels, all_predictions, average='macro')

    return epoch_loss, epoch_f1


def validate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for images, labels in tqdm(loader, desc='Validation'):
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * images.size(0)
            predictions = outputs.argmax(dim=1).cpu().numpy()
            all_predictions.extend(predictions)
            all_labels.extend(labels.cpu().numpy())

    epoch_loss = running_loss / len(loader.dataset)
    epoch_f1 = f1_score(all_labels, all_predictions, average='macro')

    return epoch_loss, epoch_f1

print("✓ Training functions defined")

### 5.2 Main Training Loop

In [None]:
def train_model(model_name, train_df, fold, epochs):
    print(f"\n{'='*60}")
    print(f"Training {model_name} - Fold {fold+1}/{Config.N_FOLDS}")
    print(f"{'='*60}")

    clear_memory()

    train_fold = train_df[train_df['fold'] != fold]
    valid_fold = train_df[train_df['fold'] == fold]

    train_dataset = GrainDataset(train_fold, get_transforms(Config.IMG_SIZE, augment=True))
    valid_dataset = GrainDataset(valid_fold, get_transforms(Config.IMG_SIZE, augment=False))

    train_loader = DataLoader(
        train_dataset,
        batch_size=Config.BATCH_SIZE,
        shuffle=True,
        num_workers=Config.NUM_WORKERS,
        pin_memory=True
    )
    valid_loader = DataLoader(
        valid_dataset,
        batch_size=Config.BATCH_SIZE,
        shuffle=False,
        num_workers=Config.NUM_WORKERS,
        pin_memory=True
    )

    model = GrainClassifier(model_name, Config.NUM_CLASSES).to(device)

    criterion = nn.CrossEntropyLoss(label_smoothing=Config.LABEL_SMOOTHING)
    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=Config.LEARNING_RATE,
        weight_decay=Config.WEIGHT_DECAY
    )

    # Learning rate scheduler
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
        optimizer,
        T_0=10,
        T_mult=1
    )

    scaler = torch.cuda.amp.GradScaler()

    # Training loop
    best_f1 = 0
    model_short = model_name.replace('.', '_').replace('/', '_')
    best_model_path = f'{model_short}_fold{fold}_best.pth'

    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}")

        train_loss, train_f1 = train_one_epoch(
            model, train_loader, criterion, optimizer, scaler, device
        )

        valid_loss, valid_f1 = validate(model, valid_loader, criterion, device)

        # Update learning rate
        scheduler.step()

        print(f"Train Loss: {train_loss:.4f}, Train F1: {train_f1:.4f}")
        print(f"Valid Loss: {valid_loss:.4f}, Valid F1: {valid_f1:.4f}")

        if valid_f1 > best_f1:
            best_f1 = valid_f1
            torch.save(model.state_dict(), best_model_path)
            print(f"✓ Best model saved (F1: {best_f1:.4f})")

    print(f"\n{'='*60}")
    print(f"Fold {fold+1} completed. Best F1: {best_f1:.4f}")
    print(f"{'='*60}")

    return best_model_path, best_f1

print("✓ Main training function defined")

In [None]:
def train_all_models(train_df, models, n_folds, epochs):
    results = {}

    for model_name in models:
        print(f"\n{'#'*60}")
        print(f"MODEL: {model_name}")
        print(f"{'#'*60}")

        results[model_name] = {'paths': [], 'scores': []}

        for fold in range(n_folds):
            path, score = train_model(model_name, train_df, fold, epochs)
            results[model_name]['paths'].append(path)
            results[model_name]['scores'].append(score)

        avg_score = np.mean(results[model_name]['scores'])
        std_score = np.std(results[model_name]['scores'])
        print(f"\n{'='*60}")
        print(f"Model: {model_name}")
        print(f"Average F1: {avg_score:.4f} ± {std_score:.4f}")
        print(f"Fold scores: {[f'{s:.4f}' for s in results[model_name]['scores']]}")
        print(f"{'='*60}")

    return results

training_results = train_all_models(
    train_df,
    Config.MODELS,
    Config.N_FOLDS,
    Config.EPOCHS
)

In [None]:
def predict_with_tta(model, images, device):
    model.eval()
    predictions = []

    with torch.no_grad():
        images = images.to(device)

        # Original
        outputs = model(images)
        predictions.append(F.softmax(outputs, dim=1).cpu().numpy())

        # Horizontal flip
        outputs = model(torch.flip(images, dims=[3]))
        predictions.append(F.softmax(outputs, dim=1).cpu().numpy())

        # Vertical flip
        outputs = model(torch.flip(images, dims=[2]))
        predictions.append(F.softmax(outputs, dim=1).cpu().numpy())

        # Both flips
        outputs = model(torch.flip(images, dims=[2, 3]))
        predictions.append(F.softmax(outputs, dim=1).cpu().numpy())

    return np.mean(predictions, axis=0)

print("✓ TTA function defined")

In [None]:
def make_ensemble_predictions(test_df, model_results):
    test_dataset = GrainDataset(test_df, get_transforms(Config.IMG_SIZE, augment=False))
    test_loader = DataLoader(
        test_dataset,
        batch_size=Config.BATCH_SIZE,
        shuffle=False,
        num_workers=Config.NUM_WORKERS,
        pin_memory=True
    )

    all_model_predictions = []

    for model_name, results in model_results.items():
        print(f"\nGenerating predictions: {model_name}")
        model_fold_predictions = []

        for fold, path in enumerate(results['paths']):
            clear_memory()

            model = GrainClassifier(model_name, Config.NUM_CLASSES).to(device)
            model.load_state_dict(torch.load(path))
            model.eval()

            fold_predictions = []
            for images in tqdm(test_loader, desc=f"  Fold {fold+1}"):
                preds = predict_with_tta(model, images, device)
                fold_predictions.append(preds)

            fold_predictions = np.vstack(fold_predictions)
            model_fold_predictions.append(fold_predictions)

            del model
            clear_memory()

        model_avg = np.mean(model_fold_predictions, axis=0)
        all_model_predictions.append(model_avg)

    ensemble_predictions = np.mean(all_model_predictions, axis=0)
    final_classes = ensemble_predictions.argmax(axis=1)

    return final_classes, ensemble_predictions

print("✓ Ensemble prediction function defined")

In [None]:
final_predictions, ensemble_probs = make_ensemble_predictions(test_df, training_results)

submission = pd.DataFrame({
    'img': test_df['img'],
    'class': [Config.IDX_TO_CLASS[idx] for idx in final_predictions]
})

submission.to_csv('submission.csv', index=False)