In [None]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import timm
from torchvision import transforms as T
from tqdm import tqdm
import gc

class CFG:
    # General
    num_workers = 4
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Data paths
    data_dir = './'
    labels_csv_path = os.path.join(data_dir, 'labels.csv')
    sample_submission_path = os.path.join(data_dir, 'sample_submission.csv')
    test_img_dir = os.path.join(data_dir, 'test')

    # Model
    model_name = 'tf_efficientnet_b4_ns'
    img_size = 384
    model_paths = [
        'models/tf_efficientnet_b4_ns_fold0_best.pth',
        'models/tf_efficientnet_b4_ns_fold1_best.pth',
        'models/tf_efficientnet_b4_ns_fold2_best.pth'
    ]

    # Inference
    batch_size = 32
    # This threshold was found by optimizing sample-wise F2 on the OOF predictions.
    threshold = 0.9300

# Load label mappings
labels_df = pd.read_csv(CFG.labels_csv_path)
CFG.attr_ids = labels_df['attribute_id'].values
CFG.attr_id_to_idx = {attr_id: i for i, attr_id in enumerate(CFG.attr_ids)}
CFG.idx_to_attr_id = {i: attr_id for i, attr_id in enumerate(CFG.attr_ids)}
CFG.num_classes = len(labels_df)

# Clean up memory
torch.cuda.empty_cache()
gc.collect()

In [None]:
def get_test_transforms(flipped=False):
    # CORRECTED: Based on 01_training_pipeline.ipynb and successful OOF generation.
    # This uses aspect-ratio preserving resize, center crop, and ImageNet normalization.
    print("--- Applying CORRECTED validation transforms (Resize+CenterCrop, ImageNet Norm) ---")
    
    transforms_list = [
        T.Resize(CFG.img_size), # Preserves aspect ratio
        T.CenterCrop(CFG.img_size),
    ]
    
    if flipped:
        # Apply horizontal flip for TTA
        transforms_list.append(T.RandomHorizontalFlip(p=1.0))

    transforms_list.extend([
        T.ToTensor(),
        T.Normalize(
            mean=[0.485, 0.456, 0.406], # ImageNet stats
            std=[0.229, 0.224, 0.225],
        ),
    ])
    
    return T.Compose(transforms_list)

class iMetTestDataset(Dataset):
    def __init__(self, df, transforms=None):
        self.df = df
        self.filepaths = df['filepath'].values
        self.transforms = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        filepath = self.filepaths[idx]
        image = Image.open(filepath).convert('RGB')
        
        if self.transforms:
            image = self.transforms(image)
            
        return image

class iMetModel(nn.Module):
    def __init__(self, model_name, pretrained=False):
        super().__init__()
        self.model = timm.create_model(model_name, pretrained=pretrained, num_classes=CFG.num_classes)

    def forward(self, x):
        return self.model(x)

In [None]:
## Prepare Test Data
sub_df = pd.read_csv(CFG.sample_submission_path)
sub_df['filepath'] = sub_df['id'].apply(lambda x: os.path.join(CFG.test_img_dir, x + '.png'))
display(sub_df.head())

# Create two datasets: one for original images, one for flipped
test_dataset_normal = iMetTestDataset(sub_df, transforms=get_test_transforms(flipped=False))
test_loader_normal = DataLoader(test_dataset_normal, batch_size=CFG.batch_size, shuffle=False, num_workers=CFG.num_workers)

test_dataset_flipped = iMetTestDataset(sub_df, transforms=get_test_transforms(flipped=True))
test_loader_flipped = DataLoader(test_dataset_flipped, batch_size=CFG.batch_size, shuffle=False, num_workers=CFG.num_workers)

In [None]:
## Inference with TTA (Ensemble)

# Pre-allocate array for summed predictions for memory efficiency
n_samples = len(sub_df)
# We will sum predictions from 3 models * 2 augmentations (normal, flipped)
total_preds = np.zeros((n_samples, CFG.num_classes), dtype=np.float32)
tta_loaders = {
    "normal": test_loader_normal,
    "flipped": test_loader_flipped
}

for i, model_path in enumerate(CFG.model_paths):
    print(f"--- Inferencing with model {i+1}/{len(CFG.model_paths)}: {model_path} ---")
    
    # Load model
    model = iMetModel(CFG.model_name, pretrained=False).to(CFG.device)
    # Set weights_only=True for security
    model.load_state_dict(torch.load(model_path, weights_only=True))
    model.eval()

    # TTA loop
    for tta_type, test_loader in tta_loaders.items():
        print(f"  -- TTA: {tta_type} --")
        pbar = tqdm(test_loader, desc=f"Predicting (Model {i+1}, {tta_type})")
        current_pos = 0
        with torch.no_grad():
            for images in pbar:
                images = images.to(CFG.device)
                logits = model(images)
                preds = logits.sigmoid().cpu().numpy()
                
                batch_size = images.size(0)
                total_preds[current_pos : current_pos + batch_size] += preds
                current_pos += batch_size
            
    # Clean up memory after each model
    del model
    torch.cuda.empty_cache()
    gc.collect()

# Average the predictions (3 models * 2 TTA = 6 total predictions per image)
all_preds = total_preds / (len(CFG.model_paths) * len(tta_loaders))
print("\nEnsemble TTA predictions calculated.")

In [None]:
## Create Submission
predictions = []
for pred_row in tqdm(all_preds, desc="Formatting submission"):
    # Apply threshold
    pred_labels = (pred_row > CFG.threshold).astype(int)
    
    # If no labels are predicted, take the one with the highest probability
    if pred_labels.sum() == 0:
        pred_labels[pred_row.argmax()] = 1
        
    # Convert indices to attribute_ids
    attr_ids = [CFG.idx_to_attr_id[i] for i, label in enumerate(pred_labels) if label == 1]
    predictions.append(' '.join(map(str, attr_ids)))

sub_df['attribute_ids'] = predictions
sub_df[['id', 'attribute_ids']].to_csv('submission.csv', index=False)
print("Submission file created successfully!")
display(sub_df.head())