In [1]:
import os
import h5py
import imageio.v3 as iio
import numpy as np

### Define data parsing class: PhotoData

In [2]:
from typing import Tuple


class PhotoData(object):

    SRC_FOLDER = 'images/'

    LBL_FOLDER = 'labels/'

    HDF5_FILE = 'all_images.h5'

    N = 600

    def indextofilename(index) -> str:
        return f"{index:04d}.jpg"
    
    def indextolabelname(index) -> Tuple[str]:
        ind = f"{index:04d}"
        return f"{ind}_person.png", f"{ind}_clothes.png"


    def loadimages(self):
        images = np.ndarray((PhotoData.N, 600, 400, 3))
        for i in range(PhotoData.N):
            img_url = os.path.join(PhotoData.SRC_FOLDER, PhotoData.indextofilename(i + 1))
            im = iio.imread(img_url)
            images[i] = im
        return images
    
    def loadlabels(self):
        p_labels = np.ndarray((PhotoData.N, 600, 400), dtype=bool)
        c_labels = np.ndarray((PhotoData.N, 600, 400), dtype=np.int8)

        for i in range(PhotoData.N):
            p_name, c_name = PhotoData.indextolabelname(i+1)
            p_url = os.path.join(PhotoData.LBL_FOLDER, p_name)
            c_url = os.path.join(PhotoData.LBL_FOLDER, c_name)
            p_im = iio.imread(p_url)
            c_im = iio.imread(c_url)
            p_labels[i] = p_im[:,:,0] > 0
            # https://stackoverflow.com/questions/15635025/how-to-map-false-color-image-to-specific-labels-assigned-for-each-color
            r_channel = c_im[:, :, 0] > 0
            g_channel = c_im[:, :, 1] > 0
            b_channel = c_im[:, :, 2] > 0
            labels = (b_channel << 2) + (g_channel << 1) + r_channel
            c_labels[i] = labels
        return p_labels, c_labels
    
    def parsedata(self):
        images = self.loadimages()
        p_labels, c_labels = self.loadlabels()
        return images, p_labels, c_labels

    def storeh5(self, images, p_labels, c_labels):
        file = h5py.File(PhotoData.HDF5_FILE, 'w')
        file.create_dataset('images', np.shape(images), h5py.h5t.STD_U8BE, data=images)
        file.create_dataset('person_labels', np.shape(p_labels), h5py.h5t.STD_U8BE, data=p_labels)
        file.create_dataset('clothing_labels', np.shape(c_labels), h5py.h5t.STD_U8BE, data=c_labels)
        file.close()

    def loadh5(self):
        images, p_labels, c_labels = [], [], []
        file = h5py.File(PhotoData.HDF5_FILE, 'r+')
        images = np.array(file['images']).astype('uint8')
        p_labels = np.array(file['person_labels']).astype('float32')
        c_labels = np.array(file['clothing_labels']).astype('float32')
        return images, p_labels, c_labels

## Process and Store data
parse and store the data in h5

In [3]:
import math

data = PhotoData()
i, p, c = data.parsedata()

In [4]:
# subdivide images for ViT
def subdivideimage(imgs: np.ndarray, suppixsize=16):
    """Pads the images and subdivides for number of pixels required"""
    N = imgs.shape[0]
    H = imgs.shape[1]
    W = imgs.shape[2]
    rows = math.ceil(H / suppixsize)
    # cols = math.ceil(W / suppixsize)

    has_color = len(imgs.shape) > 3

    dims = (N, 1, W, 3) if has_color else (N, 1, W,)
    if H % suppixsize > 0:
        for i in range(suppixsize - (H % suppixsize)):
            imgs = np.append(imgs, np.zeros(dims), axis=1)
    new_H = rows * suppixsize
    dims = (N, new_H, 1, 3) if has_color else (N, new_H, 1,)
    if W % suppixsize > 0:
        for i in range(suppixsize - (W % suppixsize)):
            imgs = np.append(imgs, np.zeros(dims), axis=2)
    # return imgs
    # https://stackoverflow.com/questions/16856788/slice-2d-array-into-smaller-2d-arrays
    swap_dim_1 = [N, rows, suppixsize, -1, suppixsize]
    swap_dim_2 = [N, -1, suppixsize, suppixsize]
    if has_color:
        swap_dim_1.append(3)
        swap_dim_2.append(3)
    return imgs.reshape(swap_dim_1).swapaxes(2,3).reshape(swap_dim_2)

images = subdivideimage(i)
p_labels = subdivideimage(p)
c_labels = subdivideimage(c)

In [5]:
# reshape images
def reshapeimages(images):
    N = images.shape[0]
    M = images.shape[1]
    return images.reshape((N, M, -1))

images = reshapeimages(images)

In [6]:
# produce [2] labels
def processpersonlabels(p_labels):
    N = p_labels.shape[0]
    M = p_labels.shape[1]
    total = p_labels.shape[2] ** 2
    person_arr = (np.sum(p_labels.reshape((N, M, -1)), axis=2, dtype=float) / total).reshape((N, M, 1))
    background_arr = np.ones((N, M, 1)) - person_arr
    return np.append(background_arr, person_arr, axis=2)
pp = processpersonlabels(p_labels=p_labels)
pp[0, 0]

array([1., 0.])

In [7]:
# produce [7] labels
def processclotheslabels(c_labels):
    N = c_labels.shape[0]
    M = c_labels.shape[1]
    total = c_labels.shape[2] ** 2
    flattened = c_labels.reshape((N, M, -1))
    cc = np.zeros((N, M, 0))
    for i in range(7):
        x = (np.sum(flattened == i, axis=2, dtype=float) / total).reshape((N, M, 1))
        cc = np.append(cc, x, axis=2)
    return cc

cc = processclotheslabels(c_labels)
cc[0, 0]

array([1., 0., 0., 0., 0., 0., 0.])

In [8]:
data.storeh5(images, pp, cc)

## Load data

In [3]:
data = PhotoData()
images, p_labels, c_labels = data.loadh5()
p_labels = p_labels.swapaxes(1,2)
c_labels = c_labels.swapaxes(1,2)

In [9]:
images.shape

(600, 988, 768)

# Model
Define the model, will be writing a vision transformer (ViT, https://arxiv.org/abs/2010.11929v2).

In [4]:
import torch
from torch import nn

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# default: d_model=768, nhead=12, num_layers=12
encoder_layer = nn.TransformerEncoderLayer(d_model=768, nhead=8)
transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=6).to(device)
# src = torch.rand(3, 968, 768, device=device)
# out = transformer_encoder(src) # torch.Size([3, 968, 768])



In [5]:
p_model = nn.Sequential(
    transformer_encoder,
    nn.Linear(768, 2),
    nn.Softmax(dim=2)
).to(device)

c_model = nn.Sequential(
    transformer_encoder,
    nn.Linear(768, 7),
    nn.Softmax(dim=2)
).to(device)


In [6]:
from torch import optim

p_optimizer = optim.AdamW(p_model.parameters())
c_optimizer = optim.AdamW(c_model.parameters())

In [9]:
import time
bce = nn.BCELoss().to(device)
cel = nn.CrossEntropyLoss().to(device)

def train(model, src, tgt, optimizer, loss_fn=cel, ckpt_name=None, batch_size=64, epochs=3):
    N = src.shape[0]
    losses = []
    batches = N // batch_size
    model.train()
    
    print(f"Starting training: {batches} batches per epoch for {epochs} epochs")
    for e in range(epochs):
        print(f"\tStarting epoch {e+1}")
        starttime = time.time()
        losses.append([])
        for i in range(batches):
            start = i * batch_size
            train_src = src[start:start + batch_size]
            train_tgt = tgt[start:start + batch_size]

            optimizer.zero_grad()

            output = model(train_src).swapaxes(1, 2)
            loss = loss_fn(output, train_tgt)

            loss.backward()
            optimizer.step()

            losses[e].append(loss.item())
        
        print(f"\tEpoch {e+1:02d} took {time.time() - starttime}s.\n\tTotal loss at end of epoch: {sum(losses[e]) / batches}")

        if ckpt_name is not None:
            torch.save({
                'epoch': e+1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'batch_losses': losses[e]
            }, f"checkpoints/{ckpt_name}-epoch{e+1}.pt")

    return losses



In [8]:
train_size = 300
src = torch.tensor(images[:train_size], dtype=torch.float32).to(device)
tgt_p = torch.tensor(p_labels[:train_size], dtype=torch.float32).to(device)
tgt_c = torch.tensor(c_labels[:train_size], dtype=torch.float32).to(device)

In [10]:
# Train person identifier
p_losses = train(p_model, src, tgt_p, p_optimizer, loss_fn=bce, ckpt_name='person_model', batch_size=16, epochs=15)

Starting training: 18 batches per epoch for 15 epochs
	Starting epoch 1
	Epoch 01 took 288.2376284599304s.
	Total loss at end of epoch: 1.5516875916057162


RuntimeError: Parent directory checkpoints does not exist.

In [25]:
# Train clothing identifier
c_losses = train(c_model, src, tgt_c, c_optimizer, ckpt_name='clothes_model', batch_size=16, epochs=6)

Starting training: 18 batches per epoch for 6 epochs
	Starting epoch 1
	Total loss at end of epoch 1: 1.646254645453559
	Starting epoch 2
	Total loss at end of epoch 2: 1.6462497446272109
	Starting epoch 3
	Total loss at end of epoch 3: 1.646322210629781
	Starting epoch 4
	Total loss at end of epoch 4: 1.646307733323839
	Starting epoch 5
	Total loss at end of epoch 5: 1.6462871829668682
	Starting epoch 6
	Total loss at end of epoch 6: 1.6462660895453558


In [29]:
def validate(model, valid_src, valid_tgt, loss_fn):
    model.eval()
    with torch.no_grad():
        output = model(valid_src).swapaxes(1, 2)
        loss = loss_fn(output, valid_tgt)
        return loss.item()


In [27]:
validation_size = 60
valid_src = torch.tensor(images[train_size:train_size + validation_size], dtype=torch.float32).to(device)
valid_tgt_p = torch.tensor(p_labels[train_size:train_size + validation_size], dtype=torch.float32).to(device)
valid_tgt_c = torch.tensor(c_labels[train_size:train_size + validation_size], dtype=torch.float32).to(device)

In [30]:
validate(p_model, valid_src, valid_tgt_p, bce)

0.817803144454956

# Load and infer with Models

In [None]:
checkpoint = torch.load('person_model-epoch6.pt', map_location=torch.device('cpu'))

p_model.load_state_dict(checkpoint['model_state_dict'])