# Semi-Supervised Learning with ResNet18 utilizing Pseudolabeling and Entropy-Based Regularization

### Importing Libraries

In [1]:
import numpy as np
import os
import cv2
import torchvision
from torchvision.datasets import ImageFolder
from torchvision import transforms
import torch
import torch.nn as nn
from PIL import Image
import matplotlib.pyplot as plt
from torchvision.transforms import v2
import os
import pandas as pd
import torch.optim.lr_scheduler as lr_scheduler
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
from torch import optim
import matplotlib.pyplot as plt
import gc

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [3]:
print(f"CUDA version: {torch.version.cuda}")
cuda_id = torch.cuda.current_device()
print(f"Name of current CUDA device:{torch.cuda.get_device_name(cuda_id)}")

CUDA version: 12.1
Name of current CUDA device:Tesla T4


### Defining Hyperparameters and Global Variables

In [45]:
labels = pd.read_csv("./files/train_labeled.csv")
categories = pd.read_csv("./files/categories.csv")
batch_size = 32
seed = torch.manual_seed(40)
epochs = 80
n_classes = 135
lr = 0.01
threshold = 0.9
e_lambda = 0.6
decay_gamma = 0.3
pseudo_labmda = 0.5

train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.v2.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(45),
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_acc = []
train_loss = []
val_acc = []
val_loss = []

sm = nn.Softmax(dim=1)

### Custom Dataloaders for Labeled and Unlabeled Data

In [5]:
class LabeledDataset(Dataset):
    def __init__(self, image_paths, labels, pseudo = "labeled", transform = None):
        self.image_paths = image_paths
        self.transform = transform
        self.labels = labels
        self.pseudo = pseudo
    
    def get_class_label(self, image_name):
        id = (self.labels.loc[self.labels['image'] == image_name])['id'].values[0]
        return id
    
    def __getitem__(self, index):
        image_path = self.image_paths[index]
        x = Image.open(f"./files/train/{self.pseudo}/{image_path}")
        if self.transform is not None:
            x = self.transform(x)
        y = self.get_class_label(image_path)
        return x, y, image_path
    def __len__(self):
        return len(self.image_paths)
        
        
class UnlabeledDataset(Dataset):
    def __init__(self, image_paths, path, transform = None):
        self.image_paths = image_paths
        self.transform = transform
        self.path = path
    
    def __getitem__(self, index):
        image_path = self.image_paths[index]
        x = Image.open(f"{self.path}{image_path}")
        if self.transform is not None:
            x = self.transform(x)
        return x, index, image_path
    def __len__(self):
        return len(self.image_paths)    

### Loading Images into Corresponding Datasets and Dataloaders
Labeled Dataset Size: 9854
<br>
Unlabeled Dataset Size: 22995
<br>
Due to the small labeled datset size to predict 135 classes (15 types of leaves and 120 types of dogs), I utilized semi-supervised learning methods to utilize the unlabeled datset

In [6]:

train_labeled_dataset = "./files/train/labeled"
train_unlabeled_dataset = "./files/train/unlabeled"
test_dataset = "./files/test"

train_labeled = np.array(os.listdir(train_labeled_dataset))
train_unlabeled = np.array(os.listdir(train_unlabeled_dataset))
train_unlabeled= train_unlabeled[train_unlabeled!='26804.jpg']
test = np.array(os.listdir(test_dataset))
test.sort()
train_labeled, split_test = train_test_split(train_labeled, test_size = 0.2)
train_l = LabeledDataset(train_labeled, labels, "labeled", train_transform)
train_l_loader = torch.utils.data.DataLoader(train_l, batch_size = batch_size, shuffle = True)
split_t = LabeledDataset(split_test, labels, "labeled", test_transform)
split_t_loader = torch.utils.data.DataLoader(split_t, batch_size = batch_size, shuffle = True)
train_ul = UnlabeledDataset(train_unlabeled, "./files/train/unlabeled/", test_transform)
train_ul_loader = torch.utils.data.DataLoader(train_ul, batch_size = 1, shuffle = True)
test_dataset = UnlabeledDataset(test, "./files/test/", test_transform)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = 1, shuffle=False)


### Defining the Classifier (Resnet18)

In [7]:
classifier = torchvision.models.resnet18()
classifier.fc = nn.Sequential(
    nn.Linear(in_features=512, out_features=135)
)
classifier = classifier.to(device)

In [8]:
for name,param in classifier.named_parameters():
    param.requires_grad = True

### Cross Entropy Loss, Adam Optimizer with L2 Regularization, and Step Learning Rate Scheduler

In [9]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=lr, weight_decay = 1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 10, gamma = 0.5)


### Training on Labeled Data

In [15]:
def labeled_train():
    classifier.train()
    count = 0
    num = 0
    den = 0
    avg = 0
    for index, data in enumerate(train_l_loader):
                imgs, tlabels, paths = data
                imgs = imgs.to(device)
                tlabels = tlabels.to(device)
                outputs = classifier(imgs)
                loss = criterion(outputs, tlabels)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                output_idx = torch.argmax(outputs, dim=1)
                num = (output_idx == tlabels).sum()
                den = len(output_idx) + den
                avg = avg + loss.item()
                count = count + 1
    train_loss.append(avg/count)
    train_acc.append(num/den)

### Entropy-Based Regularization
<br>
<img src="entropyreg.gif">
<br>
I employed entropy-based regularization, producing this loss function:
where N is the total number of unlabeled images, L is the number of classes (135), and C represents my classifier. Lambda is a loss function "adjuster" that lets us control how much weight we want this unlabeled entropy-based regularization to have in backpropogation.

In [13]:
def entropy_reg():
    classifier.train()
    count = 0
    sm = nn.Softmax(dim=1)
    entropy_loss = 0
    for index, data in enumerate(train_ul_loader):
                img, pathindex, imagepath = data
                img = img.to(device)
                output = sm(classifier(img))
                entropies = ((torch.log(output) * output) * -1 * e_lambda).sum()
                entropies.backward()
                entropy_loss = entropy_loss + entropies
                count = count + 1
                if count % 1000 == 0:
                    print(count / 1000)
                if count / 1000 == 10:
                    break

### Validation Testing

In [14]:
def validation():
    num = 0
    den = 0
    avg = 0
    count = 0
    t = 0
    classifier.eval()
    sm = nn.Softmax(dim=1)
    with torch.no_grad():
        for i, data in enumerate(split_t_loader):
                        img, tlabel, path = data
                        img = img.to(device)
                        tlabel = tlabel.to(device)
                        output = classifier(img)
                        loss = criterion(output, tlabel)
                        output_idx = torch.argmax(output, dim=1)
                        num = (output_idx == tlabel).sum()
                        den = len(output_idx) + den
                        avg = avg + loss.item()
                        count = count + 1
    val_loss.append(avg/count)
    val_acc.append(num/den)

In [None]:

def train(num_epochs):
        count = 0
        torch.cuda.empty_cache()
        for ep in range(num_epochs):
            labeled_train()
            entropy_reg()
            validation()
            scheduler.step()
        

In [None]:
train(epochs)

In [None]:
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = 0.3)

In [None]:
x = [i for i in range(1,75)]
plt.subplot(1,2,1)
plt.plot(x, train_loss, label="Train")
plt.plot(x, val_loss, label="Validation")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()

plt.subplot(1,3,3)
plt.plot(x, train_acc)
plt.plot(x, val_acc)
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.show()

## Results
<br>
<img src="train-results.png">

In [None]:
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001, weight_decay = 1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = 0.5)

### Pseudolabeling
Pseudolabelling allows us to capitalize further on our unlabelled dataset. By creating hard labels based off of my classifier's predictions, I was able to expand our available labeled dataset domain in order to train the model. 
<br>
    <img src="pseudolabel.gif">
<br>
To ensure that my model was only creating hard pseudolabels on images with confidence, I established a 

In [None]:
def pseudo(num_epochs):
    for epoch in range(num_epochs):
        sm = nn.Softmax(dim=1)
        classifier.eval()
        addcount = 0
        num = 0
        den = 0
        appendlist = []
        labelappend = []
        imagepaths = []
        with torch.no_grad():
            for index, data in enumerate(train_ul_loader):
                        img, pathindex, imagepath = data
                        img = img.to(device)
                        output = classifier(img)
                        output = sm(output)
                        output_idx = torch.argmax(output, dim=1)
                        if output[0][output_idx[0]] > threshold:
                            appendlist.append(img)
                            labelappend.append(int(output_idx[0].cpu()))
                            imagepaths.append(imagepath[0])
                            addcount = addcount + 1
        print(f"Added: {addcount} / {len(train_unlabeled)}")
        psdict = {"image":imagepaths, "id":labelappend}
        pslabels = pd.DataFrame.from_dict(psdict)
        train_ps = LabeledDataset(imagepaths, pslabels, "unlabeled", train_transform)
        train_ps_loader = torch.utils.data.DataLoader(train_ps, batch_size = batch_size, shuffle = True)
        classifier.train()
        for i, data in enumerate(train_ps_loader):
                imgs, tlabels, paths = data
                imgs = imgs.to(device)
                tlabels = tlabels.to(device)
                outputs = classifier(imgs)
                output_idx = torch.argmax(outputs, dim=1)
                num = (output_idx == tlabels).sum()
                den = len(output_idx) + den
                loss = criterion(outputs, tlabels) * pseudo_lambda
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
        train(1)

In [None]:
train_acc = []
train_loss = []
val_acc = []
val_loss = []

In [None]:
x = [i for i in range(1,11)]
plt.subplot(1,2,1)
plt.plot(x, train_loss, label="Train")
plt.plot(x, val_loss, label="Validation")
plt.xlabel("Epoch")
plt.ylabel("Pseudolabeled Loss")
plt.legend()

plt.subplot(1,3,3)
plt.plot(x, train_acc)
plt.plot(x, val_acc)
plt.xlabel("Epoch")
plt.ylabel("Pseudolabeled Accuracy")
plt.show()

## Results:
<br>
<img src="pseudo-imgs.png">

In [None]:
for name,param in classifier.named_parameters():
    param.requires_grad = False

## Final Results
Formatted test classifications into a csv file to be submitted to a kaggle competition where the hidden ground truths for these test labels were compared to the csv

In [None]:
classifier.eval()
fin_dict = {}
for index, data in enumerate(test_loader):
    img, pathindex, path = data
    img = img.to(device)
    outputs = classifier(img)
    max_index = torch.argmax(outputs, dim=1)
    # print(max_index)
    fin_dict[path] = max_index.cpu()

In [None]:
keys = []
preds = []
for key in fin_dict.keys():
    print(key[0])
    keys.append(key[0])
    preds.append(int(fin_dict[key][0]))
input_dict = {"image": keys, "id": preds}
df = pd.DataFrame.from_dict(input_dict)
df.to_csv("submission.csv", index=False)

## Results:
<img src="submission.png">