In [None]:
# This notebook was trained on Kaggle.
# Paths may need to be adjusted when running locally.

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, Subset

import torchvision.transforms as transforms
import torchvision.models as models

from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split

from torchvision.models import efficientnet_b4, EfficientNet_B4_Weights

# print(os.listdir("/kaggle/input/hw4-dataset"))
# print(os.listdir("/kaggle/input/hw4-dataset/data"))

DATA_ROOT = "./data"

TRAIN_DIR = os.path.join(DATA_ROOT, "train")
TEST_DIR = os.path.join(DATA_ROOT, "test")
UNLABELED_DIR = os.path.join(DATA_ROOT, "unlabeled")


['data']
['unlabeled', 'test', 'train']


In [5]:
# train_transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.RandomHorizontalFlip(),
#     transforms.ToTensor(),
#     transforms.Normalize(
#         mean=[0.485, 0.456, 0.406],
#         std=[0.229, 0.224, 0.225]
#     ),
# ])

# test_transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize(
#         mean=[0.485, 0.456, 0.406],
#         std=[0.229, 0.224, 0.225]
#     ),
# ])

from torchvision.models import efficientnet_b4, EfficientNet_B4_Weights

weights = EfficientNet_B4_Weights.DEFAULT

train_transform = transforms.Compose([
    transforms.Resize((380, 380)),
    transforms.RandomResizedCrop(380, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(0.2, 0.2, 0.2, 0.1),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=weights.transforms().mean,
        std=weights.transforms().std,
    ),
])

test_transform = transforms.Compose([
    transforms.Resize((380, 380)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=weights.transforms().mean,
        std=weights.transforms().std,
    ),
])


In [6]:
class TrainDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.samples = []
        self.transform = transform

        for label, cls in enumerate(["real", "generated"]):
            cls_dir = os.path.join(root_dir, cls)
            for fname in os.listdir(cls_dir):
                self.samples.append(
                    (os.path.join(cls_dir, fname), label)
                )

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

# train_dataset = TrainDataset(TRAIN_DIR, train_transform)

# # ÂèñÂæóÊâÄÊúâ index
# all_indices = list(range(len(train_dataset)))

# # Âàá 80% train / 20% val
# train_idx, val_idx = train_test_split(
#     all_indices,
#     test_size=0.5,
#     random_state=777,
#     stratify=[label for _, label in train_dataset.samples]
# )

# train_subset = Subset(train_dataset, train_idx)
# val_subset = Subset(train_dataset, val_idx)

# train_loader = DataLoader(
#     train_subset,
#     batch_size=32,
#     shuffle=True,
#     num_workers=2
# )

# val_loader = DataLoader(
#     val_subset,
#     batch_size=32,
#     shuffle=False,
#     num_workers=2
# )

final_train_dataset = TrainDataset(TRAIN_DIR, train_transform)

final_train_loader = DataLoader(
    final_train_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=2
)


In [7]:
class TestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.image_paths = [
            os.path.join(root_dir, fname)
            for fname in sorted(os.listdir(root_dir))
        ]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(img_path)

test_dataset = TestDataset(TEST_DIR, test_transform)

test_loader = DataLoader(
    test_dataset,
    batch_size=16,
    shuffle=False,
    num_workers=2
)


In [8]:
tta_transform = transforms.Compose([
    transforms.Resize((380, 380)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=weights.transforms().mean,
        std=weights.transforms().std,
    ),
])


In [9]:
class TTADataset(Dataset):
    def __init__(self, root_dir, transform):
        self.image_paths = [
            os.path.join(root_dir, fname)
            for fname in sorted(os.listdir(root_dir))
        ]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        image = self.transform(image)
        return image, os.path.basename(img_path)

tta_dataset = TTADataset(TEST_DIR, tta_transform)

tta_loader = DataLoader(
    tta_dataset,
    batch_size=16,
    shuffle=False,
    num_workers=2
)


In [11]:
class UnlabeledDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.image_paths = []
        self.transform = transform

        for fname in os.listdir(root_dir):
            self.image_paths.append(os.path.join(root_dir, fname))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, img_path

unlabeled_dataset = UnlabeledDataset(UNLABELED_DIR, test_transform)

unlabeled_loader = DataLoader(
    unlabeled_dataset,
    batch_size=16,
    shuffle=False,
    num_workers=2
)


In [12]:
# from torchvision.models import resnet18, ResNet18_Weights

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# model = resnet18(weights=ResNet18_Weights.DEFAULT)

# for param in model.parameters():
#     param.requires_grad = False

# for name, param in model.named_parameters():
#     if "layer4" in name:
#         param.requires_grad = True

# # classifier
# model.fc = nn.Linear(model.fc.in_features, 2)
# for param in model.fc.parameters():
#     param.requires_grad = True

# model = model.to(device)
from torchvision.models import efficientnet_b4, EfficientNet_B4_Weights

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

weights = EfficientNet_B4_Weights.DEFAULT
model = efficientnet_b4(weights=weights)

# Freeze all
for p in model.parameters():
    p.requires_grad = False

# Unfreeze last block
for name, p in model.named_parameters():
    if "features.7" in name:
        p.requires_grad = True

# Replace classifier
model.classifier[1] = nn.Linear(model.classifier[1].in_features, 2)
for p in model.classifier.parameters():
    p.requires_grad = True

model = model.to(device)




Downloading: "https://download.pytorch.org/models/efficientnet_b4_rwightman-23ab8bcd.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b4_rwightman-23ab8bcd.pth
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 74.5M/74.5M [00:00<00:00, 162MB/s] 


In [13]:
criterion_train = nn.CrossEntropyLoss()   # È†êË®≠ reduction="mean"
criterion_mixed = nn.CrossEntropyLoss(reduction="none")

optimizer = torch.optim.AdamW([
    {"params": model.features.parameters(), "lr": 1e-5},
    {"params": model.classifier.parameters(), "lr": 3e-4},
], weight_decay=1e-4)


In [14]:
def train_one_epoch(model, loader):
    model.train()
    total_loss = 0

    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion_train(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(loader)


In [15]:
from sklearn.metrics import f1_score

def evaluate(model, loader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for imgs, labels in loader:
            imgs = imgs.to(device)
            labels = labels.to(device)

            outputs = model(imgs)
            preds = torch.argmax(outputs, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    f1 = f1_score(all_labels, all_preds)
    return f1


In [16]:
EPOCHS = 5

for epoch in range(EPOCHS):
    loss = train_one_epoch(model, final_train_loader)
    print(f"Epoch [{epoch+1}/{EPOCHS}] Loss: {loss:.4f}")


Epoch [1/5] Loss: 0.3732
Epoch [2/5] Loss: 0.2306
Epoch [3/5] Loss: 0.1971
Epoch [4/5] Loss: 0.1784
Epoch [5/5] Loss: 0.1643


In [17]:
import torch.nn.functional as F

model.eval()
pseudo_samples = []

with torch.no_grad():
    for imgs, paths in unlabeled_loader:
        imgs = imgs.to(device)
        outputs = model(imgs)
        probs = F.softmax(outputs, dim=1)

        for prob, path in zip(probs.cpu().numpy(), paths):
            conf = prob.max()
            label = prob.argmax()

            if prob[1] >= 0.99: 
                pseudo_samples.append((path, 1))
            elif prob[0] >= 0.985: 
                pseudo_samples.append((path, 0))

print("Pseudo-labeled samples:", len(pseudo_samples))


Pseudo-labeled samples: 10661


In [18]:
# class MixedDataset(Dataset):
#     def __init__(self, original_dataset, pseudo_samples, transform):
#         self.samples = list(original_dataset.samples)
#         self.samples.extend(pseudo_samples)
#         self.transform = transform

#     def __len__(self):
#         return len(self.samples)

#     def __getitem__(self, idx):
#         img_path, label = self.samples[idx]
#         image = Image.open(img_path).convert("RGB")
#         if self.transform:
#             image = self.transform(image)
#         return image, label

class MixedDataset(Dataset):
    def __init__(self, real_dataset, pseudo_samples, transform):
        self.samples = []
        self.transform = transform

        # real samples
        for path, label in real_dataset.samples:
            self.samples.append((path, label, 0))  # is_pseudo = 0

        # pseudo samples
        for path, label in pseudo_samples:
            self.samples.append((path, label, 1))  # is_pseudo = 1

    def __getitem__(self, idx):
        path, label, is_pseudo = self.samples[idx]
        image = Image.open(path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label, is_pseudo

    def __len__(self):
        return len(self.samples)
        
mixed_dataset = MixedDataset(
    final_train_dataset,
    pseudo_samples,
    train_transform
)

mixed_loader = DataLoader(
    mixed_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=2
)

In [19]:
def train_one_epoch_mixed(model, loader):
    model.train()
    total_loss = 0

    for imgs, labels, is_pseudo in loader:
        imgs = imgs.to(device)
        labels = labels.to(device)
        is_pseudo = is_pseudo.to(device)

        optimizer.zero_grad()

        outputs = model(imgs)

        # --------- üî• ‰Ω†ÂïèÁöÑÈáçÈªûÔºöÂ∞±Âú®ÈÄôË£° üî• ---------
        loss_all = criterion_mixed(outputs, labels)   # (B,)

        # class-level weight
        class_weights = torch.ones_like(loss_all)
        class_weights[labels == 0] = 1.1
        class_weights[labels == 1] = 0.9

        # pseudo-label weight
        pseudo_weights = torch.ones_like(loss_all)
        pseudo_weights[is_pseudo == 1] = 0.5

        # combined weight
        weights = class_weights * pseudo_weights

        loss = (loss_all * weights).mean()
        # -----------------------------------------------

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(loader)


In [20]:
FINAL_EPOCHS = 3

for epoch in range(FINAL_EPOCHS):
    loss = train_one_epoch_mixed(model, mixed_loader)
    print(f"[Final] Epoch [{epoch+1}/{FINAL_EPOCHS}] Loss: {loss:.4f}")


[Final] Epoch [1/3] Loss: 0.1125
[Final] Epoch [2/3] Loss: 0.1072
[Final] Epoch [3/3] Loss: 0.1024


In [21]:
# model.eval()

# test_ids = []
# test_preds = []

# with torch.no_grad():
#     for imgs, img_names in test_loader:
#         imgs = imgs.to(device)

#         outputs = model(imgs)
#         preds = torch.argmax(outputs, dim=1)

#         test_preds.extend(preds.cpu().numpy())

#         test_ids.extend([
#             os.path.splitext(name)[0] for name in img_names
#         ])

import torch.nn.functional as F

model.eval()

test_ids = []
test_preds = []

with torch.no_grad():
    for imgs, img_names in tta_loader:
        imgs = imgs.to(device)

        outputs = model(imgs)
        preds = torch.argmax(outputs, dim=1)

        test_preds.extend(preds.cpu().numpy())
        test_ids.extend([
            os.path.splitext(name)[0] for name in img_names
        ])



In [22]:
submission = pd.DataFrame({
    "filename": test_ids,
    "label": test_preds
})

submission.to_csv("submission.csv", index=False)
print("submission.csv saved!")

submission.head()

submission.csv saved!


Unnamed: 0,filename,label
0,000295da5dca4af09d5593174e15bb09,0
1,00040d088f054d379b1aae48e9f425d2,1
2,0004501ec7a74f0ab1bed517e5fe4ee3,0
3,00135256c4e24a458efa66a398d45325,1
4,00141c6d45b749b081b1881feba863c1,1


In [23]:
torch.save(model.state_dict(), "112705007_weight.pth")
