#TRAIN.py

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import ConcatDataset, DataLoader, Subset
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchvision.datasets import DatasetFolder
from tqdm.auto import tqdm
import matplotlib.pyplot as plt


class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, norm_layer=None):
        super(ResidualBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        # First convolution layer
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = norm_layer(out_channels)
        self.relu = nn.ReLU(inplace=True)

        # Second convolution layer
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = norm_layer(out_channels)

        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                norm_layer(out_channels)
            )

    def forward(self, x):
        identity = self.shortcut(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity  # Add shortcut
        out = self.relu(out)
        return out


class Classifier(nn.Module):
    def __init__(self, num_classes=14):
        super(Classifier, self).__init__()

        # Weight decay for L2 regularization (used in optimizer)
        self.weight_decay = 0.0001

        # Block 1
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.2)
        )

        # Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.3)
        )

        self.block3 = nn.Sequential(
            ResidualBlock(128, 256, stride=2),
            ResidualBlock(256, 256)
        )

        # Global Average Pooling
        self.gap = nn.AdaptiveAvgPool2d((1, 1))

        # Fully Connected Layer
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.gap(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

class EarlyStopping:
        def __init__(self, patience=7, verbose=False, delta=0):
            self.patience = patience
            self.verbose = verbose
            self.counter = 0
            self.best_score = None
            self.early_stop = False
            self.val_loss_min = float('inf')
            self.delta = delta

        def __call__(self, val_loss, model):
            score = -val_loss

            if self.best_score is None:
                self.best_score = score
                self.save_checkpoint(val_loss, model)
            elif score < self.best_score + self.delta:
                self.counter += 1
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
                if self.counter >= self.patience:
                    self.early_stop = True
            else:
                self.best_score = score
                self.save_checkpoint(val_loss, model)
                self.counter = 0

        def save_checkpoint(self, val_loss, model):
            '''Saves model when validation loss decrease.'''
            if self.verbose:
                print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
            torch.save(model.state_dict(), 'checkpoint.pt')
            self.val_loss_min = val_loss

def get_pseudo_labels(dataset, model, threshold=0.7):
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model.eval()
        softmax = nn.Softmax(dim=-1)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=0)
        pseudo_labeled_data = []
        with torch.no_grad():
            for batch in tqdm(dataloader, desc="Generating Pseudo Labels"):
                img, _ = batch
                img = img.to(device)

                logits = model(img)
                probs = softmax(logits)

                max_probs, pseudo_labels = torch.max(probs, dim=-1)


                mask = max_probs >= threshold
                pseudo_labeled_data.extend([
                    (img[i].cpu(), pseudo_labels[i].item())
                    for i in range(len(img)) if mask[i]
                ])

        model.train()
        return pseudo_labeled_data

def convert_to_rgb(image):
        return image.convert('RGB')

def image_loader(path):
        return Image.open(path)

'''
# calculate mean and std
#Mean: tensor([0.4197, 0.3729, 0.2556])
#Std: tensor([0.2478, 0.2095, 0.1908])
mean = torch.zeros(3)
std = torch.zeros(3)
total_images = 0

for images, _ in tqdm(train_loader, desc="Calculating Mean and Std"):
    batch_samples = images.size(0)
    images = images.view(batch_samples, 3, -1)

    mean += images.mean(dim=2).sum(dim=0)
    std += images.std(dim=2).sum(dim=0)
    total_images += batch_samples

mean /= total_images
std /= total_images

print(f"Mean: {mean}")
print(f"Std: {std}")
'''
'''
class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        # input image size: [3, 128, 128]
        self.cnn_layers = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),

            nn.Conv2d(64, 128, 3, 1, 1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),

            nn.Conv2d(128, 256, 3, 1, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(4, 4, 0),
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(256 * 8 * 8, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, NUM_CLASSES)
        )

    def forward(self, x):
        x = self.cnn_layers(x)
        x = x.flatten(1)
        x = self.fc_layers(x)
        return x
'''
if __name__ == '__main__':
    folder = 'dataset'
    NUM_CLASSES = 14
    #Normalization of Image Data
    mean = [0.4197, 0.3729, 0.2556]
    std = [0.2478, 0.2095, 0.1908]

    train_tfm = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.Lambda(convert_to_rgb),
        #  Data Augmentation
        transforms.RandomRotation(15),
        transforms.RandomHorizontalFlip(),
        transforms.RandomAffine(degrees=0, translate=(0.12, 0.12)),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
        transforms.RandomResizedCrop(128, scale=(0.9, 1.0)),
        transforms.RandomAffine(degrees=0, shear=(-10, 10)),

        transforms.ToTensor(),
        #Normalize
        transforms.Normalize(mean=mean, std=std),
    ])

    test_tfm = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.Lambda(convert_to_rgb),
        transforms.ToTensor(),
        #Normalize
        transforms.Normalize(mean=mean, std=std),
    ])

    train_set = DatasetFolder(folder + "/train/labeled", loader=image_loader, extensions="jpg", transform=train_tfm)
    valid_set = DatasetFolder(folder + "/val", loader=image_loader, extensions="jpg", transform=test_tfm)
    unlabeled_set = DatasetFolder(folder + "/train/unlabeled", loader=image_loader, extensions="jpg", transform=train_tfm)
    test_set = DatasetFolder(folder + "/test", loader=image_loader, extensions="jpg", transform=test_tfm)

    batch_size = 128
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)


    #伪标签
    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    model = Classifier().to(device)
    model.device = device
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, weight_decay= 0.0001)

    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, min_lr=0.0005, verbose=True)
    early_stopping = EarlyStopping(patience=40, verbose=True)
    n_epochs = 70

    # Whether to do semi-supervised learning.
    do_semi = True
    train_losses, train_accs, valid_losses, valid_accs = [], [], [], []

    for epoch in range(n_epochs):
        if do_semi:

            pseudo_set = get_pseudo_labels(unlabeled_set, model)
            concat_dataset = ConcatDataset([train_set, pseudo_set])
            train_loader = DataLoader(concat_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)

        # ---------- train ----------
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for batch in tqdm(train_loader):
            imgs, labels = batch
            imgs, labels = imgs.to(device), labels.to(device)
            logits = model(imgs)
            loss = criterion(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = logits.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total
        train_losses.append(train_loss)
        train_accs.append(train_acc)


        # ---------- Validation ----------
        model.eval()
        running_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for batch in tqdm(valid_loader):
                imgs, labels = batch
                imgs, labels = imgs.to(device), labels.to(device)
                logits = model(imgs)
                loss = criterion(logits, labels)

                running_loss += loss.item()
                _, predicted = logits.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

        valid_loss = running_loss / len(valid_loader)
        valid_acc = correct / total
        valid_losses.append(valid_loss)
        valid_accs.append(valid_acc)

        scheduler.step(valid_loss)
        early_stopping(valid_loss, model)

        print(f"[Epoch {epoch + 1}/{n_epochs}] Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
              f"Valid Loss: {valid_loss:.4f}, Valid Acc: {valid_acc:.4f}")

        if early_stopping.early_stop:
            print("Early stopping")
            break




    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(valid_losses, label='Valid Loss')
    plt.title('Loss Curve')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Train Accuracy')
    plt.plot(valid_accs, label='Valid Accuracy')
    plt.title('Accuracy Curve')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.savefig('learning.jpg', dpi=300, bbox_inches='tight')
    print("Learning curves saved as 'learning.jpg'.")

    plt.show()

    model_save_path = 'trained_model.pth'
    torch.save(model.state_dict(), model_save_path)
    print(f"Trained model saved to '{model_save_path}'.")

#TEST.py

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import ConcatDataset, DataLoader, Subset
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchvision.datasets import DatasetFolder
from tqdm.auto import tqdm
import matplotlib.pyplot as plt


class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, norm_layer=None):
        super(ResidualBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        # First convolution layer
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = norm_layer(out_channels)
        self.relu = nn.ReLU(inplace=True)

        # Second convolution layer
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = norm_layer(out_channels)

        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                norm_layer(out_channels)
            )

    def forward(self, x):
        identity = self.shortcut(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity  # Add shortcut
        out = self.relu(out)
        return out


class Classifier(nn.Module):
    def __init__(self, num_classes=14):
        super(Classifier, self).__init__()

        # Weight decay for L2 regularization (used in optimizer)
        self.weight_decay = 0.0001

        # Block 1
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.2)
        )

        # Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.3)
        )

        self.block3 = nn.Sequential(
            ResidualBlock(128, 256, stride=2),
            ResidualBlock(256, 256)
        )

        # Global Average Pooling
        self.gap = nn.AdaptiveAvgPool2d((1, 1))

        # Fully Connected Layer
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.gap(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

class EarlyStopping:
        def __init__(self, patience=7, verbose=False, delta=0):
            self.patience = patience
            self.verbose = verbose
            self.counter = 0
            self.best_score = None
            self.early_stop = False
            self.val_loss_min = float('inf')
            self.delta = delta

        def __call__(self, val_loss, model):
            score = -val_loss

            if self.best_score is None:
                self.best_score = score
                self.save_checkpoint(val_loss, model)
            elif score < self.best_score + self.delta:
                self.counter += 1
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
                if self.counter >= self.patience:
                    self.early_stop = True
            else:
                self.best_score = score
                self.save_checkpoint(val_loss, model)
                self.counter = 0

        def save_checkpoint(self, val_loss, model):
            '''Saves model when validation loss decrease.'''
            if self.verbose:
                print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
            torch.save(model.state_dict(), 'checkpoint.pt')
            self.val_loss_min = val_loss

def get_pseudo_labels(dataset, model, threshold=0.7):
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model.eval()
        softmax = nn.Softmax(dim=-1)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=0)
        pseudo_labeled_data = []
        with torch.no_grad():
            for batch in tqdm(dataloader, desc="Generating Pseudo Labels"):
                img, _ = batch
                img = img.to(device)

                logits = model(img)
                probs = softmax(logits)


                max_probs, pseudo_labels = torch.max(probs, dim=-1)


                mask = max_probs >= threshold
                pseudo_labeled_data.extend([
                    (img[i].cpu(), pseudo_labels[i].item())
                    for i in range(len(img)) if mask[i]
                ])

        model.train()
        return pseudo_labeled_data

def convert_to_rgb(image):
        return image.convert('RGB')

def image_loader(path):
        return Image.open(path)

if __name__ == '__main__':
    folder = 'dataset'
    NUM_CLASSES = 14
    # Normalization of Image Data
    mean = [0.4197, 0.3729, 0.2556]
    std = [0.2478, 0.2095, 0.1908]

    test_tfm = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.Lambda(convert_to_rgb),
        transforms.ToTensor(),
        # Normalize
        transforms.Normalize(mean=mean, std=std),
    ])
    test_set = DatasetFolder(folder + "/test", loader=image_loader, extensions="jpg", transform=test_tfm)
    batch_size = 128
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

    num_images = len(test_set)
    print(f"Number of images loaded: {num_images}")


    device = "cuda" if torch.cuda.is_available() else "cpu"
    model_path = "trained_model.pth"
    model = Classifier(num_classes=14)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    predictions = []

    for batch in tqdm(test_loader):
        imgs, labels = batch
        imgs = imgs.to(device)
        with torch.no_grad():
            logits = model(imgs)

        predictions.extend(logits.argmax(dim=-1).cpu().numpy().tolist())

    # Save predictions into the file.
    with open("predict.csv", "w") as f:

        # The first row must be "Id, Category"
        f.write("Id,Category\n")

        # For the rest of the rows, each image id corresponds to a predicted class.
        for i, pred in enumerate(predictions):
            f.write(f"{i},{pred}\n")
