In [None]:

import os
import random
import shutil
import time
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, WeightedRandomSampler
from torchvision import datasets, transforms


def set_seed(seed=42):
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def remove_corrupted_images(dataset_root):
    print("Checking for corrupted images...")
    removed = 0

    if not os.path.exists(dataset_root):
        raise FileNotFoundError(f"Dataset root not found: {dataset_root}")

    for root, _, files in os.walk(dataset_root):
        for f in files:
            if not f.lower().endswith((".jpg", ".jpeg", ".png")):
                continue
            fp = os.path.join(root, f)
            try:
                Image.open(fp).verify()
            except Exception:
                try:
                    os.remove(fp)
                except:
                    pass
                removed += 1

    print(f"Removed corrupted images: {removed}")


def split_dataset(source_dir, dest_dir, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
    if os.path.exists(dest_dir):
        # امسح القديم عشان مايتكرر نسخ
        shutil.rmtree(dest_dir)

    os.makedirs(dest_dir, exist_ok=True)

    classes = [c for c in os.listdir(source_dir) if os.path.isdir(os.path.join(source_dir, c))]
    if not classes:
        raise RuntimeError("No class folders found. لازم يكون عندك class0 و class1 جوه فولدر الداتا")

    for split_name in ["train", "val", "test"]:
        for cls in classes:
            os.makedirs(os.path.join(dest_dir, split_name, cls), exist_ok=True)

    for cls in classes:
        class_path = os.path.join(source_dir, cls)
        images = [img for img in os.listdir(class_path) if img.lower().endswith((".jpg", ".jpeg", ".png"))]
        random.shuffle(images)

        total = len(images)
        if total == 0:
            print(f"Warning: class {cls} has 0 images")
            continue

        train_end = int(train_ratio * total)
        val_end = int((train_ratio + val_ratio) * total)

        splits = {
            "train": images[:train_end],
            "val": images[train_end:val_end],
            "test": images[val_end:]
        }

        for split_name, split_images in splits.items():
            for img in split_images:
                src = os.path.join(class_path, img)
                dst = os.path.join(dest_dir, split_name, cls, img)
                shutil.copy2(src, dst)

    print("Dataset split completed")


IMG_SIZE = 50

train_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation(30),
    transforms.RandomGrayscale(p=0.25),
    transforms.ColorJitter(brightness=0.35, contrast=0.35, saturation=0.25, hue=0.02),
    transforms.RandomAutocontrast(p=0.25),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])

eval_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])


def make_loaders(split_root, batch_size=128, num_workers=4, pin_memory=True):
    train_dir = os.path.join(split_root, "train")
    val_dir = os.path.join(split_root, "val")
    test_dir = os.path.join(split_root, "test")

    train_ds = datasets.ImageFolder(train_dir, transform=train_transform)
    val_ds = datasets.ImageFolder(val_dir, transform=eval_transform)
    test_ds = datasets.ImageFolder(test_dir, transform=eval_transform)

    print("class_to_idx:", train_ds.class_to_idx)

    # Balance sampler عشان class0 عندك أكبر بكتير
    targets = [y for _, y in train_ds.samples]
    class_counts = torch.bincount(torch.tensor(targets))
    class_weights = 1.0 / (class_counts.float() + 1e-6)
    sample_weights = class_weights[torch.tensor(targets)]
    sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)

    train_loader = DataLoader(
        train_ds, batch_size=batch_size, sampler=sampler,
        num_workers=num_workers, pin_memory=pin_memory, persistent_workers=(num_workers > 0)
    )
    val_loader = DataLoader(
        val_ds, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=pin_memory, persistent_workers=(num_workers > 0)
    )
    test_loader = DataLoader(
        test_ds, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=pin_memory, persistent_workers=(num_workers > 0)
    )

    return train_loader, val_loader, test_loader, train_ds


class CNN_Manual_50(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),

            nn.Conv2d(64, 128, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # 50 -> 25

            nn.Conv2d(128, 256, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # 25 -> 12

            nn.Conv2d(256, 256, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # 12 -> 6

            nn.Dropout(0.25),
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 6 * 6, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    loss_sum = 0.0
    correct = 0
    total = 0

    for x, y in loader:
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)

        out = model(x)
        loss = criterion(out, y)

        loss_sum += loss.item()
        pred = out.argmax(dim=1)
        total += y.size(0)
        correct += (pred == y).sum().item()

    acc = 100.0 * correct / max(1, total)
    avg_loss = loss_sum / max(1, len(loader))
    return acc, avg_loss


def train(model, train_loader, val_loader, device, epochs=10, lr=1e-3, save_path="cnn_manual_50.pth"):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)

    use_amp = (device.type == "cuda")
    scaler = torch.amp.GradScaler("cuda", enabled=use_amp)

    best_val_acc = -1.0

    for ep in range(1, epochs + 1):
        t0 = time.time()
        model.train()

        loss_sum = 0.0
        correct = 0
        total = 0

        for x, y in train_loader:
            x = x.to(device, non_blocking=True)
            y = y.to(device, non_blocking=True)

            optimizer.zero_grad(set_to_none=True)

            with torch.amp.autocast("cuda", enabled=use_amp):
                out = model(x)
                loss = criterion(out, y)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            loss_sum += loss.item()
            pred = out.argmax(dim=1)
            total += y.size(0)
            correct += (pred == y).sum().item()

        train_acc = 100.0 * correct / max(1, total)
        train_loss = loss_sum / max(1, len(train_loader))

        val_acc, val_loss = evaluate(model, val_loader, criterion, device)
        dt = time.time() - t0

        print(f"Epoch {ep}/{epochs} | Train Loss {train_loss:.4f} Acc {train_acc:.2f}% | Val Loss {val_loss:.4f} Acc {val_acc:.2f}% | Time {dt:.1f}s")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                "model_state": model.state_dict(),
                "class_to_idx": getattr(train_loader.dataset, "class_to_idx", None),
                "img_size": IMG_SIZE,
                "arch": "CNN_Manual_50"
            }, save_path)
            print(f"Model saved to {save_path}")


def main():
    set_seed(42)
    torch.backends.cudnn.benchmark = True

    RAW_PATH = "/kaggle/input/imagefolder2/Dataset"
    SPLIT_PATH = "/kaggle/working/cleaned_data"
    MODEL_PATH = r"/kaggle/working/cnn_manual_50.pth"

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)
    print("Torch version:", torch.__version__)
    print("CUDA available:", torch.cuda.is_available())

    remove_corrupted_images(RAW_PATH)
    split_dataset(RAW_PATH, SPLIT_PATH)

    train_loader, val_loader, test_loader, train_ds = make_loaders(
        SPLIT_PATH, batch_size=128, num_workers=4, pin_memory=True
    )

    num_classes = len(train_ds.class_to_idx)
    model = CNN_Manual_50(num_classes=num_classes).to(device)

    train(model, train_loader, val_loader, device, epochs=10, lr=1e-3, save_path=MODEL_PATH)

    test_acc, test_loss = evaluate(model, test_loader, nn.CrossEntropyLoss(), device)
    print(f"Test Accuracy: {test_acc:.2f}% | Test Loss: {test_loss:.4f}")
    print("Final model path:", MODEL_PATH)


if __name__ == "__main__":
    main()


Using device: cuda
Torch version: 2.6.0+cu124
CUDA available: True
Checking for corrupted images...
Removed corrupted images: 0
Dataset split completed
class_to_idx: {'class0': 0, 'class1': 1}
Epoch 1/10 | Train Loss 0.5097 Acc 78.02% | Val Loss 0.4349 Acc 80.40% | Time 123.1s
Model saved to /kaggle/working/cnn_manual_50.pth
Epoch 2/10 | Train Loss 0.4572 Acc 79.86% | Val Loss 0.4347 Acc 80.20% | Time 119.8s
Epoch 3/10 | Train Loss 0.4439 Acc 80.74% | Val Loss 0.3450 Acc 85.60% | Time 119.9s
Model saved to /kaggle/working/cnn_manual_50.pth
Epoch 4/10 | Train Loss 0.4245 Acc 81.67% | Val Loss 0.4336 Acc 81.68% | Time 119.8s
Epoch 5/10 | Train Loss 0.4118 Acc 82.31% | Val Loss 0.3483 Acc 85.47% | Time 118.5s
Epoch 6/10 | Train Loss 0.3984 Acc 82.99% | Val Loss 0.4525 Acc 78.75% | Time 118.0s
Epoch 7/10 | Train Loss 0.3938 Acc 83.33% | Val Loss 0.3511 Acc 83.92% | Time 117.5s
Epoch 8/10 | Train Loss 0.3832 Acc 83.83% | Val Loss 0.3324 Acc 85.52% | Time 117.9s
Epoch 9/10 | Train Loss 0.374

In [2]:
import shutil
import os

cleaned_dir = "/kaggle/working/cleaned_data"

print("Exists:", os.path.exists(cleaned_dir))
print("Contents:", os.listdir(cleaned_dir))

shutil.make_archive(
    "/kaggle/working/cleaned_data",  # output zip name
    "zip",
    cleaned_dir
)


Exists: True
Contents: ['train', 'val', 'test']


'/kaggle/working/cleaned_data.zip'