In [None]:
import os
import random
import copy
import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image 
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
 
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.cuda.amp import GradScaler, autocast

# Use smaller model variants for speed and efficiency
from timm.models.convnext import convnext_base
from timm.models.swin_transformer import swin_base_patch4_window7_224

In [None]:
def set_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

In [None]:
base_dir = '/kaggle/input/ai-vs-human-generated-dataset'
train_csv_path = os.path.join(base_dir, 'train.csv')
test_csv_path  = os.path.join(base_dir, 'test.csv')

df_train = pd.read_csv(train_csv_path)
df_test = pd.read_csv(test_csv_path)
# Example row: file_name="train_data/041be3153810...", label=0 or 1

df_train['file_name'] = df_train['file_name'].apply(lambda x: os.path.join(base_dir, x))
df_test['id'] = df_test['id'].apply(lambda x: os.path.join(base_dir, x))

# Split training data into train and validation sets (10% for validation)
all_image_paths = df_train['file_name'].values
all_labels = df_train['label'].values
X_train, X_val, y_train, y_val = train_test_split(all_image_paths, all_labels, test_size=0.1, random_state=42)

train_split_df = pd.DataFrame({'file_name': X_train, 'label': y_train})
val_split_df = pd.DataFrame({'file_name': X_val, 'label': y_val})

In [None]:
# ---------------------- Custom Dataset ----------------------
class CustomDataset(Dataset):
    def __init__(self, df, transform=None, is_test=False):
        self.df = pd.DataFrame(df)
        self.transform = transform
        self.is_test = is_test
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        if self.is_test:
            img_path = self.df.iloc[idx]["id"]
        else:
            img_path = self.df.iloc[idx]["file_name"]
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        if self.is_test:
            return img
        else:
            label = self.df.iloc[idx]["label"]
            return img, label


In [None]:
# ✅ Training Transformations (with Augmentation)
train_transforms = transforms.Compose([
    transforms.Resize((232)),                
    transforms.RandomResizedCrop(224),  
    transforms.RandomHorizontalFlip(),      
    transforms.RandomRotation(10),           
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  
    transforms.ToTensor(),        
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  
])                        
    
# ✅ Testing Transformations (NO Augmentation)
test_transforms = transforms.Compose([
    transforms.Resize(232),  
    transforms.CenterCrop(224),             
    transforms.ToTensor(),                        
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])   
])
batch_size = 4

train_dataset = CustomDataset(df=train_split_df, transform=train_transforms, is_test=False)
val_dataset = CustomDataset(df=val_split_df, transform=test_transforms, is_test=False)
test_dataset = CustomDataset(df=df_test, transform=test_transforms, is_test=True)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True, persistent_workers=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True, persistent_workers=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True, persistent_workers=True)

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("CUDA is available, using GPU.")
else:
    device = torch.device("cpu")
    print("CUDA is not available, using CPU.")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from timm import create_model  # Ensure timm is installed
from torch.amp import autocast, GradScaler
from torch.optim.lr_scheduler import CosineAnnealingLR
from sklearn.metrics import f1_score, roc_auc_score
from tqdm import tqdm
import copy
import numpy as np

# ---------------------- Utility: Compute Metrics ----------------------
def compute_metrics(preds, targets):
    """
    Computes weighted F1 score using sklearn.
    """
    preds_np = preds.cpu().numpy() if torch.is_tensor(preds) else preds
    targets_np = targets.cpu().numpy() if torch.is_tensor(targets) else targets
    return f1_score(targets_np, preds_np, average='weighted')

# ---------------------- Feature Fusion Model (Simplified) ----------------------
class FeatureFusionModel(nn.Module):
    def __init__(self, num_classes=1):
        """
        Fuses features from ConvNeXt Large and Swin Transformer using a simple 
        global average pooling and fully connected fusion block. This architecture
        mimics the simpler ViTConXWithAvgPooling model that generalizes well on unseen data.
        """
        super().__init__()
        # Load pre-trained backbones without their classification heads
        self.convnext = create_model("convnext_large", pretrained=True, num_classes=0)
        convnext_out = self.convnext.num_features
        
        self.swin = create_model("swin_base_patch4_window7_224", pretrained=True, num_classes=0)
        swin_out = self.swin.num_features

        # Freeze all parameters of both backbones
        for param in self.convnext.parameters():
            param.requires_grad = False
        for param in self.swin.parameters():
            param.requires_grad = False

        # Unfreeze the last 20 parameters (i.e. fine-tune later layers)
        for param in list(self.convnext.parameters())[-10:]:
            param.requires_grad = True
        for param in list(self.swin.parameters())[-10:]:
            param.requires_grad = True

        # Fully Connected Fusion Block (mirroring the simpler code)
        self.feature_fusion = nn.Sequential(
            nn.BatchNorm1d(convnext_out + swin_out),
            nn.Linear(convnext_out + swin_out, 1024),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.4),
        )
        
        # Decoder: Further processing before final output
        self.decoder = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)  
        )
    
    def forward(self, x):
        # Pass input through both backbones
        x_convnext = self.convnext(x)
        x_swin = self.swin(x)
        
        # Apply global average pooling to convert features into fixed-size vectors
        # (Assuming outputs are 4D tensors; if already 2D, this operation is skipped)
        if x_convnext.dim() == 4:
            x_convnext = F.adaptive_avg_pool2d(x_convnext, (1, 1)).view(x_convnext.size(0), -1)
        if x_swin.dim() == 4:
            x_swin = F.adaptive_avg_pool2d(x_swin, (1, 1)).view(x_swin.size(0), -1)
        
        # Concatenate the two feature vectors
        x_combined = torch.cat((x_convnext, x_swin), dim=1)
        x_fused = self.feature_fusion(x_combined)
        
        # Produce the final classification output
        decoded_output = self.decoder(x_fused)
        return decoded_output

# ---------------------- Training Loop with Early Stopping ----------------------
def train_model(num_epochs=3, lr=1e-4, weight_decay=1e-2, patience=1):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Initialize the model and move it to device
    model = FeatureFusionModel(num_classes=1).to(device)
    
    # Prepare optimizer using only parameters that require gradients
    trainable_params = [p for p in model.parameters() if p.requires_grad]
    optimizer = optim.AdamW(trainable_params, lr=lr, weight_decay=weight_decay)
    
    # Scheduler: CosineAnnealingLR for smooth learning rate decay
    scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs)
    
    # Loss function: BCEWithLogitsLoss (for binary classification)
    criterion = nn.BCEWithLogitsLoss().to(device)
    
    # Initialize GradScaler for mixed precision training (no device argument)
    scaler = GradScaler()
    
    # Lists to log progress (optional)
    train_losses, train_accuracies = [], []
    val_losses, val_accuracies, val_f1_scores, val_roc_aucs = [], [], [], []
    
    best_val_loss = float("inf")
    epochs_without_improvement = 0  
    
    for epoch in range(num_epochs):
        model.train()
        epoch_loss, epoch_accuracy = 0.0, 0.0
        
        # Training loop
        for data, label in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
            data, label = data.to(device), label.to(device).float()
            optimizer.zero_grad()
            
            # Mixed precision forward pass
            with autocast("cuda"):
                output = model(data).squeeze(1)
                loss = criterion(output, label)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
            epoch_loss += loss.item()
            preds = (torch.sigmoid(output) > 0.5).float()
            epoch_accuracy += (preds == label).float().mean().item()
        
        epoch_loss /= len(train_loader)
        epoch_accuracy /= len(train_loader)
        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_accuracy)
        
        # Validation phase
        model.eval()
        val_loss, val_acc = 0.0, 0.0
        val_pred_classes, val_labels_list = [], []
        
        with torch.no_grad():
            for data, label in tqdm(val_loader, desc=f"Validation Epoch {epoch+1}"):
                data, label = data.to(device), label.to(device).float()
                with autocast("cuda"):
                    output = model(data).squeeze(1)
                    loss = criterion(output, label)
                val_loss += loss.item()
                preds = (torch.sigmoid(output) > 0.5).float()
                val_acc += (preds == label).float().mean().item()
                val_pred_classes.extend(preds.cpu().numpy())
                val_labels_list.extend(label.cpu().numpy())
        
        val_loss /= len(val_loader)
        val_acc /= len(val_loader)
        val_f1 = f1_score(np.array(val_labels_list, dtype=int), 
                          np.array(val_pred_classes, dtype=int))
        val_roc_auc = roc_auc_score(np.array(val_labels_list, dtype=int), 
                                    np.array(val_pred_classes, dtype=int))
        
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)
        val_f1_scores.append(val_f1)
        val_roc_aucs.append(val_roc_auc)
        
        print(f"Epoch [{epoch+1}/{num_epochs}] "
              f"Train Loss: {epoch_loss:.4f} | Train Acc: {epoch_accuracy:.4f} | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f} | "
              f"Val F1: {val_f1:.4f} | Val ROC AUC: {val_roc_auc:.4f}")
        
        scheduler.step()
        
        # Early stopping based on validation loss
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model_weights.pth')
            torch.save(model, 'best_model.pth')
            print("Model saved!")
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1
            if epochs_without_improvement >= patience:
                print(f"⚠️ Early stopping at epoch {epoch+1}")
                break
    
    print(f"Best Val Loss: {best_val_loss:.4f}")
    model.load_state_dict(torch.load('best_model_weights.pth'))
    return model

# ---------------------- Main Execution ----------------------
if __name__ == "__main__":
    # Ensure that train_loader, val_loader, train_dataset, and val_dataset are defined.
    trained_model = train_model(num_epochs=1, lr=1e-4, weight_decay=1e-2, patience=1)


In [None]:
def run_inference(model):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.eval()
    test_predictions = []
    with torch.no_grad():
        for data in tqdm(test_loader, desc="Inference Progress", total=len(test_loader)):
            data = data.to(device)
            output = model(data).squeeze(1)
            probs = torch.sigmoid(output)
            preds = (probs > 0.5).int()
            test_predictions.extend(preds.cpu().numpy())
    test_predictions = np.array(test_predictions, dtype=int)
    submission_df = df_test.copy()
    submission_df["id"] = submission_df["id"].apply(lambda x: x.split(os.sep)[-1])
    submission_df["id"] = "test_data_v2/" + submission_df["id"]
    submission_df["label"] = test_predictions
    print(submission_df["label"].value_counts())
    submission_df.to_csv("submission.csv", index=False)
    print("Submission saved as submission.csv")

if __name__ == "__main__":
    run_inference(trained_model)
