In [None]:
import os
import math
import torch
import random
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import torch.nn as nn
from pathlib import Path
import torch.optim as optim
import torch.nn.init as init
import torch.nn.functional as F
from torch.utils.data import Dataset
from torchvision.io import read_image
from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor, ToPILImage

## Set paths for pipeline

I'm unsure how you're planning to reproduce the results (on Kaggle or elsewhere) so I set it up so the paths can be manually set here

In [ ]:
# Models
models_dir = Path('models')
model_file = 'net_epochs-25.pytorch'    # My best model

# Train data
train_data_path = Path('data/train_imgs')
labels_path = Path('data/labels.csv')

# Test data
test_path = Path('/kaggle/input/hat-or-no-hat-spring24-cu-denver/test_set/test_set')
sample_submission = Path('results/sample_submission.csv') # Used to get file names for test imgs

save_dir = Path('results')

## Preprocess training dataset

In [ ]:
class ImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        

    def random_flip_and_rotate(self, img):
        if random.random() < 0.5:
            img = np.flipud(img)

        if random.random() < 0.5:
            img = np.fliplr(img)

        angle = random.choice([0, 1, 2, 3])
        img = np.rot90(img, angle)

        return img.copy()


    def preprocess_image(self, img, target_size):
        # Resize the image
        image = img.resize(target_size)
        image_array = np.array(image)
        # Normalize pixel values
        image_array = image_array / 255.0
        # Convert to RGB if greyscale
        if len(image_array.shape) == 2:
            image_array = np.stack((image_array,) * 3, axis=-1)
        # Randomly flip and rotate image
        image_array = self.random_flip_and_rotate(image_array)
        return image_array.transpose(2, 0, 1)
                         
                         
    def __len__(self):
        return len(self.img_labels)
                         

    def __getitem__(self, idx):
        target_size = (64, 64)
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = Image.open(img_path)
        image.thumbnail(target_size, Image.ANTIALIAS)
        new_image = Image.new("RGB", target_size)
        new_image.paste(image, ((target_size[0] - image.size[0]) // 2, (target_size[1] - image.size[1]) // 2))
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(np.array(new_image))
        image = self.preprocess_image(image, target_size)
        return image, label

## Define Network

In [ ]:
def init_weights(modules):
    pass
   

class MeanShift(nn.Module):
    def __init__(self, mean_rgb, sub):
        super(MeanShift, self).__init__()

        sign = -1 if sub else 1
        r = mean_rgb[0] * sign
        g = mean_rgb[1] * sign
        b = mean_rgb[2] * sign

        self.shifter = nn.Conv2d(3, 3, 1, 1, 0)
        self.shifter.weight.data = torch.eye(3).view(3, 3, 1, 1)
        self.shifter.bias.data   = torch.Tensor([r, g, b])

        # Freeze the mean shift layer
        for params in self.shifter.parameters():
            params.requires_grad = False

    def forward(self, x):
        x = self.shifter(x)
        return x


class BasicBlock(nn.Module):
    def __init__(self,
                 in_channels, out_channels,
                 ksize=3, stride=1, pad=1):
        super(BasicBlock, self).__init__()

        self.body = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, ksize, stride, pad),
            nn.ReLU(inplace=True)
        )

        init_weights(self.modules)
        
    def forward(self, x):
        out = self.body(x)
        return out


class ResidualBlock(nn.Module):
    def __init__(self, 
                 in_channels, out_channels):
        super(ResidualBlock, self).__init__()

        self.body = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1),
        )

        init_weights(self.modules)
        
    def forward(self, x):
        out = self.body(x)
        out = F.relu(out + x)
        return out


class EResidualBlock(nn.Module):
    def __init__(self, 
                 in_channels, out_channels,
                 group=1):
        super(EResidualBlock, self).__init__()

        self.body = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, groups=group),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, groups=group),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 1, 1, 0),
        )

        init_weights(self.modules)
        
    def forward(self, x):
        out = self.body(x)
        out = F.relu(out + x)
        return out


class UpsampleBlock(nn.Module):
    def __init__(self, 
                 n_channels, scale, multi_scale, 
                 group=1):
        super(UpsampleBlock, self).__init__()

        if multi_scale:
            self.up2 = _UpsampleBlock(n_channels, scale=2, group=group)
            self.up3 = _UpsampleBlock(n_channels, scale=3, group=group)
            self.up4 = _UpsampleBlock(n_channels, scale=4, group=group)
        else:
            self.up =  _UpsampleBlock(n_channels, scale=scale, group=group)

        self.multi_scale = multi_scale

    def forward(self, x, scale):
        if self.multi_scale:
            if scale == 2:
                return self.up2(x)
            elif scale == 3:
                return self.up3(x)
            elif scale == 4:
                return self.up4(x)
        else:
            return self.up(x)


class _UpsampleBlock(nn.Module):
    def __init__(self, 
				 n_channels, scale, 
				 group=1):
        super(_UpsampleBlock, self).__init__()

        modules = []
        if scale == 2 or scale == 4 or scale == 8:
            for _ in range(int(math.log(scale, 2))):
                modules += [
                    nn.Conv2d(n_channels, 4*n_channels, 3, 1, 1, groups=group), 
                    nn.ReLU(inplace=True)
                ]
                modules += [nn.PixelShuffle(2)]
        elif scale == 3:
            modules += [
                nn.Conv2d(n_channels, 9*n_channels, 3, 1, 1, groups=group), 
                nn.ReLU(inplace=True)
            ]
            modules += [nn.PixelShuffle(3)]

        self.body = nn.Sequential(*modules)
        init_weights(self.modules)
        
    def forward(self, x):
        out = self.body(x)
        return out
    
    
class Block(nn.Module):
    def __init__(self):
        super(Block, self).__init__()

        self.b1 = ResidualBlock(64, 64)
        self.b2 = ResidualBlock(64, 64)
        self.b3 = ResidualBlock(64, 64)
        self.c1 = BasicBlock(64*2, 64, 1, 1, 0)
        self.c2 = BasicBlock(64*3, 64, 1, 1, 0)
        self.c3 = BasicBlock(64*4, 64, 1, 1, 0)

    def forward(self, x):
        c0 = o0 = x

        b1 = self.b1(o0)
        c1 = torch.cat([c0, b1], dim=1)
        o1 = self.c1(c1)
        
        b2 = self.b2(o1)
        c2 = torch.cat([c1, b2], dim=1)
        o2 = self.c2(c2)
        
        b3 = self.b3(o2)
        c3 = torch.cat([c2, b3], dim=1)
        o3 = self.c3(c3)

        return o3
        

class Classifier(nn.Module):
    def __init__(self, in_channels):
        super(Classifier, self).__init__()
        
        self.in_channels = in_channels
        
        self.flat = nn.Flatten()
        self.l0 = nn.Linear(12288, 300)
        self.l1 = nn.Linear(300, 100)
        self.l2 = nn.Linear(100, 1)
        
        self.body = nn.Sequential(
            self.flat,
            self.l0,
            nn.ReLU(inplace=True),
            self.l1,
            nn.ReLU(inplace=True),
            self.l2,
            nn.Sigmoid()
        )
    
    def forward(self, x):
        out = self.body(x)
        return out


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        self.sub_mean = MeanShift((0.4488, 0.4371, 0.4040), sub=True)
        self.add_mean = MeanShift((0.4488, 0.4371, 0.4040), sub=False)
        
        self.entry = nn.Conv2d(3, 64, 3, 1, 1)

        self.b1 = Block(64, 64)
        self.b2 = Block(64, 64)
        self.b3 = Block(64, 64)
        self.c1 = BasicBlock(64*2, 64, 1, 1, 0)
        self.c2 = BasicBlock(64*3, 64, 1, 1, 0)
        self.c3 = BasicBlock(64*4, 64, 1, 1, 0)
        
        self.exit = nn.Conv2d(64, 3, 3, 1, 1)
        self.classify = Classifier([64, 3, 3])
                
    def forward(self, x):
        x = self.sub_mean(x)
        x = self.entry(x)
        c0 = o0 = x

        b1 = self.b1(o0)
        c1 = torch.cat([c0, b1], dim=1)
        o1 = self.c1(c1)
        
        b2 = self.b2(o1)
        c2 = torch.cat([c1, b2], dim=1)
        o2 = self.c2(c2)
        
        b3 = self.b3(o2)
        c3 = torch.cat([c2, b3], dim=1)
        o3 = self.c3(c3)

        out = self.exit(o3)
        out = self.add_mean(out)
        
        classification = self.classify(out)

        return classification

## Train the model

In [ ]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
save_model = True
epochs = 25

annotations_file = labels_path
img_dir = train_data_path
train_data = ImageDataset(
    annotations_file, 
    img_dir, 
    transform=ToPILImage()
)

train_loader = DataLoader(
    train_data,
    batch_size=4,
    num_workers=1,
    shuffle=True, 
)

loss_fn = nn.BCELoss()
net = Net().to(device)
optimizer = optim.Adam(net.parameters(), lr=0.0003)
for epoch in range(1, epochs+1):
    run_result = {'nsamples': 0, 'loss': 0}
    
    for p in net.parameters():
        if p.grad is not None:
            del p.grad  # free some memory
    torch.cuda.empty_cache()
    
    train_bar = tqdm(train_loader)
    for data, target in train_bar:
        batch_size = data.size(0)
        run_result['nsamples'] += batch_size

        label = target.to(device)
        z = data.to(device)
        pred_prob = net(z.float())
        
        label = label.unsqueeze(1)
        label = label.float()
        net.zero_grad()
        loss = loss_fn(pred_prob, label)
        loss.backward()
        optimizer.step()
        
        run_result['loss'] += loss.item() * batch_size

        train_bar.set_description(
            desc=f"[{epoch}/{epochs}] Loss: {run_result['loss'] / run_result['nsamples']:.4f}")
    
    train_loss = run_result['loss'] / run_result['nsamples']
    
    net.eval()

if save_model:
    torch.save(net.state_dict(), models_dir / f'net_epochs-{epochs}.pytorch')

## Define test data loader

In [ ]:
class TestDataset(Dataset):
    def __init__(self, img_dir, file_names, transform=None):
        self.img_dir = img_dir
        self.file_names = pd.read_csv(file_names)
        self.transform = transform

    def preprocess_image(self, img, target_size):
        # Resize the image
        image = img.resize(target_size)
        # Convert image to numpy array
        image_array = np.array(image)
        # Normalize pixel values
        image_array = image_array / 255.0
        # If the image is grayscale, convert it to RGB
        if len(image_array.shape) == 2:
            image_array = np.stack((image_array,) * 3, axis=-1)
        # Randomly flip and rotate image
        return image_array.transpose(2, 0, 1) 
    
    def __len__(self):
        return len([Path(file_) for file_ in Path(self.img_dir).glob("*")])

    def __getitem__(self, idx):
        target_size = (64, 64)
        img_path = os.path.join(self.img_dir, f'{self.file_names.iloc[idx, 0]}.jpg')
        _id = self.file_names.iloc[idx, 0]
        image = Image.open(img_path)
        image.thumbnail(target_size, Image.ANTIALIAS)
        new_image = Image.new("RGB", target_size)
        new_image.paste(image, ((target_size[0] - image.size[0]) // 2, (target_size[1] - image.size[1]) // 2))
        if self.transform:
            image = self.transform(np.array(new_image))
        image = self.preprocess_image(image, target_size)
        return image, _id

## Test the model

In [ ]:
test_data = TestDataset(
    test_path, 
    file_names=sample_submission,
    transform=ToPILImage()
)
test_loader = DataLoader(
    test_data,
    batch_size=1,
    num_workers=1,
    shuffle=False, 
)

weights = torch.load(model_file)
net = Net()
net.to(device)
net.load_state_dict(weights)
net.eval()

img_ids = []
predictions = []
with torch.no_grad():
    for batch in tqdm(test_loader, desc='Predicting: '):
        test_img , _id = batch
        test_img = test_img.to(device)
        out = net(test_img.float())
        
        img_ids.append(_id[0])
        predictions.append(out.to('cpu').numpy()[0][0])

## Threshold and save predictions

In [ ]:
pos_thresh = 0.999

pred_array = np.array(predictions)
y_pred = np.where(pred_array > pos_thresh, "Hat", "No Hat")

submission_dict = {'id': img_ids, 'class': y_pred}
submission_df = pd.DataFrame.from_dict(submission_dict)
submission_df.to_csv(save_dir / "submission.csv", index=False)