<a href="https://colab.research.google.com/github/rohith-66/ai-generated-image-detection/blob/Rohith/train_baseline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
from google.colab import drive
drive.mount("/content/drive")

import os
os.environ["DATA_ROOT"] = "/content/drive/MyDrive/AI_Image_Detection_Data"

import pandas as pd

DATA_ROOT = os.environ.get("DATA_ROOT", "./data")
CSV_PATH = os.path.join(DATA_ROOT, "index", "dataset_index.csv")
CHECKPOINT_DIR = os.path.join(DATA_ROOT, "checkpoints")

df = pd.read_csv(CSV_PATH)

train_df = df[df["split"]=="train"].reset_index(drop=True)
val_df   = df[df["split"]=="val"].reset_index(drop=True)
test_df  = df[df["split"]=="test"].reset_index(drop=True)

print(len(train_df), len(val_df), len(test_df))
print("Train label counts:\n", train_df["label"].value_counts())


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
16000 2000 2000
Train label counts:
 label
0    8000
1    8000
Name: count, dtype: int64


In [2]:
import time
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD  = (0.229, 0.224, 0.225)

train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

eval_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

class ImageIndexDataset(Dataset):
    def __init__(self, df, transform=None, retries=6, sleep=0.3):
        self.df = df.reset_index(drop=True)
        self.transform = transform
        self.retries = retries
        self.sleep = sleep

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        path = row["filepath"]
        label = int(row["label"])

        last_err = None
        for _ in range(self.retries):
            try:
                with Image.open(path) as im:
                    im = im.convert("RGB")
                    im.load()
                if self.transform:
                    im = self.transform(im)
                return im, torch.tensor(label, dtype=torch.long)
            except Exception as e:
                last_err = e
                time.sleep(self.sleep)

        raise RuntimeError(f"Failed to read {path}\nLast error: {last_err}")

BATCH_SIZE = 32
train_loader = DataLoader(ImageIndexDataset(train_df, train_transform),
                          batch_size=BATCH_SIZE, shuffle=True, num_workers=0)

val_loader = DataLoader(ImageIndexDataset(val_df, eval_transform),
                        batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

test_loader = DataLoader(ImageIndexDataset(test_df, eval_transform),
                         batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

print("Loaders ready.")


Loaders ready.


In [3]:
import torch.nn as nn
import torch.nn.functional as F

class BaselineCNN(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1), nn.BatchNorm2d(32), nn.ReLU(),
            nn.MaxPool2d(2),  # 112

            nn.Conv2d(32, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
            nn.MaxPool2d(2),  # 56

            nn.Conv2d(64, 128, 3, padding=1), nn.BatchNorm2d(128), nn.ReLU(),
            nn.MaxPool2d(2),  # 28

            nn.Conv2d(128, 256, 3, padding=1), nn.BatchNorm2d(256), nn.ReLU(),
            nn.MaxPool2d(2),  # 14
        )
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(256, 128), nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.pool(x).flatten(1)
        x = self.classifier(x)
        return x

device = "cuda" if torch.cuda.is_available() else "cpu"
model = BaselineCNN().to(device)
print("Device:", device)


Device: cpu


In [4]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)


In [5]:
import os
import numpy as np

os.makedirs(CHECKPOINT_DIR, exist_ok=True)

def run_epoch(model, loader, train=True):
    model.train(train)
    total_loss = 0.0
    correct = 0
    total = 0

    for x, y in loader:
        x, y = x.to(device), y.to(device)

        if train:
            optimizer.zero_grad()

        logits = model(x)
        loss = criterion(logits, y)

        if train:
            loss.backward()
            optimizer.step()

        total_loss += loss.item() * x.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += x.size(0)

    return total_loss / total, correct / total


EPOCHS = 8
best_val_acc = 0.0
history = []

for epoch in range(1, EPOCHS + 1):
    train_loss, train_acc = run_epoch(model, train_loader, train=True)
    val_loss, val_acc = run_epoch(model, val_loader, train=False)

    history.append({
        "epoch": epoch,
        "train_loss": train_loss,
        "train_acc": train_acc,
        "val_loss": val_loss,
        "val_acc": val_acc
    })

    print(f"Epoch {epoch:02d} | "
          f"train loss {train_loss:.4f} acc {train_acc:.4f} | "
          f"val loss {val_loss:.4f} acc {val_acc:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        ckpt_path = f"{CHECKPOINT_DIR}/baseline_cnn_best.pt"
        torch.save({
            "model_state": model.state_dict(),
            "epoch": epoch,
            "val_acc": val_acc,
            "val_loss": val_loss
        }, ckpt_path)
        print("Saved best checkpoint:", ckpt_path)


KeyboardInterrupt: 

In [6]:
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd

csv_path = "/content/drive/MyDrive/AI_Image_Detection_Data/index/dataset_index.csv"
df = pd.read_csv(csv_path)

df['split'].value_counts()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Unnamed: 0_level_0,count
split,Unnamed: 1_level_1
train,16000
extra_real,5000
val,2000
test,2000



**Local Dataset Caching (Why This Step Exists)**

**Purpose**: This step copies the dataset images from Google Drive into Colab’s local storage (/content/) before training.

**Why?** : Training directly from Google Drive causes:



*   Slow image loading
*   I/O bottlenecks


*   Random timeouts
*   Interrupted training



Google Drive is not optimized for high-frequency file access during deep learning training.

**To fix this, we:**

Read file paths from the master CSV

Copy only required images (train/val/test) into local storage

Update filepaths in the DataFrame to point to the cached location

Train the model using local paths (fast & stable)

In [7]:
import os, shutil
from tqdm import tqdm

CACHE_ROOT = "/content/dataset_cache"
os.makedirs(CACHE_ROOT, exist_ok=True)

def cache_split(df_split, split_name):
    out_root = os.path.join(CACHE_ROOT, split_name)
    os.makedirs(out_root, exist_ok=True)

    cached_paths = []
    for p in tqdm(df_split['filepath'].tolist(), desc=f"Caching {split_name}"):
        # p is absolute path in Drive per your CSV
        # Keep a stable structure: label/source/basename
        label = int(df_split.loc[df_split['filepath'] == p, 'label'].iloc[0])
        source = str(df_split.loc[df_split['filepath'] == p, 'source'].iloc[0])

        fname = os.path.basename(p)
        subdir = os.path.join(out_root, f"label_{label}", source)
        os.makedirs(subdir, exist_ok=True)

        dst = os.path.join(subdir, fname)
        if not os.path.exists(dst):
            shutil.copy2(p, dst)

        cached_paths.append(dst)

    return cached_paths

df_train = df[df['split'] == 'train'].copy()
df_val   = df[df['split'] == 'val'].copy()
df_test  = df[df['split'] == 'test'].copy()

train_cached = cache_split(df_train, "train")
val_cached   = cache_split(df_val, "val")
test_cached  = cache_split(df_test, "test")

# Update dataframe to use cached filepaths
df_train['filepath'] = train_cached
df_val['filepath']   = val_cached
df_test['filepath']  = test_cached


Caching train: 100%|██████████| 16000/16000 [1:06:24<00:00,  4.02it/s]
Caching val: 100%|██████████| 2000/2000 [08:04<00:00,  4.13it/s]
Caching test: 100%|██████████| 2000/2000 [08:08<00:00,  4.09it/s]


In [11]:
from torchvision import transforms

# ImageNet normalization stats
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD  = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD)
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD)
])


In [12]:
train_ds = ImageIndexDataset(df_train, transform=train_transform)
val_ds   = ImageIndexDataset(df_val, transform=val_transform)
test_ds  = ImageIndexDataset(df_test, transform=val_transform)


In [13]:
images, labels = next(iter(train_loader))
print(images.shape)
print(labels.unique())


torch.Size([32, 3, 224, 224])
tensor([0, 1])


Our cached pipeline + loader are working correctly (right shapes, both classes present). Now run baseline training without touching Drive during epochs.

**Train loop (with best-checkpoint + early stopping)**

In [14]:
import torch, os, copy
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_model(model, train_loader, val_loader, criterion, optimizer,
                epochs=10, patience=3, save_path="/content/best_model.pt"):
    best_wts = copy.deepcopy(model.state_dict())
    best_val_loss = float("inf")
    bad_epochs = 0

    history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}

    model.to(device)

    for epoch in range(1, epochs+1):
        # ---- train ----
        model.train()
        running_loss, correct, total = 0.0, 0, 0

        for x, y in tqdm(train_loader, desc=f"Epoch {epoch}/{epochs} [train]", leave=False):
            x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)

            optimizer.zero_grad(set_to_none=True)
            out = model(x)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * x.size(0)
            preds = out.argmax(dim=1)
            correct += (preds == y).sum().item()
            total += y.size(0)

        train_loss = running_loss / total
        train_acc = correct / total

        # ---- val ----
        model.eval()
        running_loss, correct, total = 0.0, 0, 0
        with torch.no_grad():
            for x, y in tqdm(val_loader, desc=f"Epoch {epoch}/{epochs} [val]", leave=False):
                x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
                out = model(x)
                loss = criterion(out, y)

                running_loss += loss.item() * x.size(0)
                preds = out.argmax(dim=1)
                correct += (preds == y).sum().item()
                total += y.size(0)

        val_loss = running_loss / total
        val_acc = correct / total

        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)

        print(f"Epoch {epoch}: "
              f"train_loss={train_loss:.4f}, train_acc={train_acc:.4f} | "
              f"val_loss={val_loss:.4f}, val_acc={val_acc:.4f}")

        # ---- checkpoint + early stop ----
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_wts = copy.deepcopy(model.state_dict())
            torch.save(best_wts, save_path)
            bad_epochs = 0
        else:
            bad_epochs += 1
            if bad_epochs >= patience:
                print(f"Early stopping (no val_loss improvement for {patience} epochs).")
                break

    model.load_state_dict(best_wts)
    return model, history


**Run it (plug to model)**

In [15]:
import torch.nn as nn
import torch.optim as optim

model = model
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

model, history = train_model(
    model, train_loader, val_loader,
    criterion, optimizer,
    epochs=15, patience=4,
    save_path="/content/best_model.pt"
)




KeyboardInterrupt: 

**Evaluate on test set + confusion matrix**

In [None]:
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix

def eval_model(model, loader):
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device, non_blocking=True)
            out = model(x)
            preds = out.argmax(dim=1).cpu().numpy()
            y_pred.extend(preds)
            y_true.extend(y.numpy())
    return np.array(y_true), np.array(y_pred)

y_true, y_pred = eval_model(model, test_loader)

print(classification_report(y_true, y_pred, target_names=["Real(0)", "AI(1)"]))
print("Confusion matrix:\n", confusion_matrix(y_true, y_pred))


**Copy final artifacts back to Drive (once)**

In [None]:
import shutil, os

drive_out = "/content/drive/MyDrive/AI_Image_Detection_Data/results"
os.makedirs(drive_out, exist_ok=True)

shutil.copy2("/content/best_model.pt", os.path.join(drive_out, "best_model.pt"))

# save report text too
report_path = os.path.join(drive_out, "test_report.txt")
with open(report_path, "w") as f:
    f.write(classification_report(y_true, y_pred, target_names=["Real(0)", "AI(1)"]))
