### All Shapes and Colors - Kaggle Challenge ###

In [None]:
# Imports
import os, ast, random
import numpy as np
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torchvision.transforms as T
import matplotlib.pyplot as plt

def seed_everything(seed=17):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
seed_everything(17)

# Device (prototyped on my MacBook Air M1)
device = torch.device(
    "mps" if torch.backends.mps.is_available() else
    ("cuda" if torch.cuda.is_available() else "cpu") # for colab compatibility
)
device

After looking at the problem, I can see we have the following constraints:
- Each image contains k objects where 1 <= k <= 9, since there are no duplicate objects with same (shape, color) in any given image.
- we can have multiple of the same shape and multiple of the same color in the image. 

I'll want to use a multi-label target over all 9 possible pairs. For each image, the target is a 9-dimension one-hot vector with a one for each present pair in the image.

In [None]:
SHAPES = ['circle', 'square', 'triangle']
COLORS = ['red', 'green', 'blue']

# assign each (shape, color) pair an index
PAIR_TO_IDX = {
    f"{shape}_{color}": i
    for i, (shape, color) in enumerate(
        (s, c) for s in SHAPES for c in COLORS
    )
}

IDX_TO_PAIR = {v: k for k, v in PAIR_TO_IDX.items()} # need this to decode predictions

print(PAIR_TO_IDX)
print(IDX_TO_PAIR)

Step one is to setup label representation

In [None]:
# Encoding the pairs to multi-hot vector
def encode_pairs(pairs):
    """
    Encode a list of (shape, color) pairs to a multi-hot vector.
    pairs: list of tuples (shape, color)
    returns: torch tensor of shape (9,)
    """
    target = np.zeros(len(PAIR_TO_IDX), dtype=np.float32)
    for shape, color in pairs:
        shape, color = shape.lower(), color.lower()
        key = f"{shape}_{color}"
        y = PAIR_TO_IDX[key]
        target[y] = 1.0
    target = torch.from_numpy(target)
    return target

# pairs = [("circle","red"), ("triangle","blue")]
# y = encode_pairs(pairs)
# print(y)        
# print(y.sum())  

def decode_vec(y):
    """
    Decode a multi-hot vector to a list of (shape, color) pairs.
    y: torch tensor of shape (9,)
    returns: list of tuples (shape, color)
    """
    probs = torch.sigmoid(y) # convert logits to probabilities using sigmoid (will give values between 0 and 1)
    idxs = (probs >= 0.5).nonzero(as_tuple=True)[0].tolist() # threshold at 0.5
    pairs = []
    for i in idxs:
        pair = IDX_TO_PAIR[i]
        shape, color = pair.split("_")
        pairs.append((shape, color))
    return pairs


# logits = torch.tensor([3.0, -1.0, 0.2, 0.0, 0.0, 2.5, -2.0, 0.0, 0.0])
# decoded = decode_vec(logits)
# print(decoded)



There's a function to make the data usable for training. I represent the labels as the 9-dim tensor mentioned earlier. 

I also have a function to decode the tensor outputted at inference time. It applies a sigmoid to get probabilities, thresholds, then turns the tensor back into (shape, color) tuples. 

Next thing to do is load the data.

In [None]:
def parse_label_string(string): # need this since the CSV has labels as strings
    """
    Parse a label string into a list of (shape, color) pairs.
    string: str, e.g. "[(circle_red), (triangle_blue)]" or "[]"
    returns: list of tuples (shape, color)
    """
    if string == "" or string == "[]" or string is None: #handle null or empty arg
        return []
    s = string.lower().strip()
    data = ast.literal_eval(s) # turn the string into a list of tuples
    # normalize the tuples
    data = [(shape.strip().lower(), color.strip().lower()) for shape, color in data]
    return data

# test parser
# samples = [
#     "[('square', 'blue'), ('circle', 'green'), ('square', 'red')]",
#     "[('circle','blue'),('square','green'),('circle','red'),('square','red')]",
#     "[]"
# ]
# for s in samples:
#     pairs = parse_label_string(s)
#     assert isinstance(pairs, list), "Parser should return a list"
#     for (shape, color) in pairs:
#         assert shape in SHAPES and color in COLORS, f"Bad pair parsed: {(shape,color)}"

Creating dataset classes here. I use pillow to load the images and apply transformations. In this case I'm just doing normalization since the provided images already look like they have distortion and noise applied. No resizing either. I may add image augmentation if validation suggests any overfitting.

In [None]:
# Datasets
class ShapesColorsDatasetTrain(Dataset):
    """
    Dataset for training and validation: (image_tensor, target_vector)
    CSV columns: image_path, label
    """
    def __init__(self, csv_file, img_dir, transform=None):
        self.df = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        # resolve image path
        rel = str(row["image_path"])
        img_path = rel if os.path.isabs(rel) else os.path.join(self.img_dir, rel)

        # load image with pillow and force rgb color space
        img = Image.open(img_path).convert("RGB")

        # apply any preprocessing transforms
        if self.transform:
            img = self.transform(img)

        # parse and encode label
        pairs = parse_label_string(row["label"])    
        target = encode_pairs(pairs)                    

        # return tensors ready for the model and loss
        return img, target
    
class ShapesColorsDatasetTest(Dataset):
    """
    Dataset for testing: (image_tensor, image_path)
    CSV column: image_path
    """
    def __init__(self, csv_file, img_dir, transform=None):
        self.df = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        rel = str(self.df.iloc[idx]["image_path"])
        img_path = rel if os.path.isabs(rel) else os.path.join(self.img_dir, rel)

        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)

        return img, rel
    
# Create datasets using existing classes with index subsets
class IndexedDataset(Dataset):
    def __init__(self, base_dataset, indices):
        self.base_dataset = base_dataset
        self.indices = indices
    
    def __len__(self):
        return len(self.indices)
    
    def __getitem__(self, idx):
        return self.base_dataset[self.indices[idx]]

Transforms and loaders. I'm using a batch size of 32, shuffling for training, and numworkers = 2. I set numworkers to 0 while working on my Mac, but I'll bring those back if I move to a Colab env. Seems like this dataset is small enough that my M1 should be okay.

In [None]:
BATCH_SIZE = 32

# File paths 
train_csv = "all-shapes-and-colors-v-2/dataset_v3/train.csv"
test_csv = "all-shapes-and-colors-v-2/dataset_v3/test.csv"
train_img_dir = "all-shapes-and-colors-v-2/dataset_v3"  
test_img_dir = "all-shapes-and-colors-v-2/dataset_v3"   

# Load and split training data
train_df = pd.read_csv(train_csv)
np.random.seed(17)
indices = np.random.permutation(len(train_df))
split_idx = int(0.8 * len(indices)) # 80 20 split ratio for the training and validation sets
train_indices, val_indices = indices[:split_idx], indices[split_idx:]

print(f"Total training samples: {len(train_df)}")
print(f"Train split: {len(train_indices)}, Val split: {len(val_indices)}")

# Transforms
train_tfms = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
val_tfms = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Create base training dataset and split it
full_train_ds = ShapesColorsDatasetTrain(train_csv, train_img_dir, transform=train_tfms)
train_ds = IndexedDataset(full_train_ds, train_indices)

full_val_ds = ShapesColorsDatasetTrain(train_csv, train_img_dir, transform=val_tfms) 
val_ds = IndexedDataset(full_val_ds, val_indices)

test_ds = ShapesColorsDatasetTest(test_csv, test_img_dir, transform=val_tfms)

# Create DataLoaders
train_loader = DataLoader(
    train_ds, 
    batch_size=BATCH_SIZE, 
    shuffle=True, 
    num_workers=0, 
    pin_memory=True
)
val_loader = DataLoader(
    val_ds, 
    batch_size=BATCH_SIZE, 
    shuffle=False, 
    num_workers=0, 
    pin_memory=True
)
test_loader = DataLoader(
    test_ds, 
    batch_size=BATCH_SIZE, 
    shuffle=False, 
    num_workers=0, 
    pin_memory=True
)

print(f"Datasets created - Train: {len(train_ds)}, Val: {len(val_ds)}, Test: {len(test_ds)}")

#test the dataset
# x0, y0 = train_ds[0]
# print("image:", x0.shape)         
# print("target:", y0)              
# print("num objects:", int(y0.sum().item()))

# sanity check dataloader
def unnorm(x):
    # invert normalize back to [0,1] for display
    return (x * 0.5 + 0.5).clamp(0, 1)

images, targets = next(iter(train_loader))

plt.figure(figsize=(8, 8))
for i in range(min(9, images.size(0))):
    plt.subplot(3, 3, i+1)
    img = unnorm(images[i]).permute(1, 2, 0).numpy()
    plt.imshow(img)
    idxs = (targets[i] > 0.5).nonzero(as_tuple=True)[0].tolist()
    labels = [IDX_TO_PAIR[j] for j in idxs]  
    plt.title(", ".join(labels), fontsize=9)
    plt.axis("off")
plt.tight_layout(); plt.show()

Onto the model. I used a small, modern CNN architecture with global average pooling. So my blocks have convolutions, batch normalization, ReLU, and then pooling. I have four of these blocks and then a global average pool before the final linear layer with the 9 logits. 

For loss, I've got a multi-label BCEWithLogitsLoss, which compares the 9 logits against the 9-dimensional one-hot target.

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class MyCNN(nn.Module):
    """
    Small, stable CNN for multi-label (9 outputs).
    256x256 - 128 - 64 - 32 - 16 after 4 max-pools.
    GAP removes size dependence and keeps params small.
    """
    def __init__(self, num_out=9):
        super().__init__()
        def block(c_in, c_out):
            return nn.Sequential(
                nn.Conv2d(c_in, c_out, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(c_out),
                nn.ReLU(inplace=True),
                nn.MaxPool2d(2)
            )
        self.features = nn.Sequential(
            block(3,   32),  # 256 to 128
            block(32,  64),  # 128 to  64
            block(64, 128),  #  64 to  32
            block(128,256),  #  32 to  16
        )
        self.gap  = nn.AdaptiveAvgPool2d(1) # [B,256,16,16] to [B,256,1,1]
        self.head = nn.Linear(256, num_out) # 9 logits (no sigmoid)

    def forward(self, x):
        x = self.features(x)
        x = self.gap(x).flatten(1)  
        return self.head(x)         
    
model = MyCNN(num_out=9).to(device)
sum(p.numel() for p in model.parameters())

criterion = nn.BCEWithLogitsLoss()  # multi-label

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=1e-3, # conventional learning rate for Adam
    weight_decay=1e-4   # L2 regularization
)

# training epoch function
def train_one_epoch(loader):
    model.train()
    running_loss, n = 0.0, 0
    for images, targets in loader:
        images, targets = images.to(device), targets.to(device)
        logits = model(images)
        loss = criterion(logits, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        n += images.size(0)
    return running_loss / n  # average loss

#validation epoch function
def validate_one_epoch(loader):
    model.eval()
    running_loss, n = 0.0, 0
    for images, targets in loader:
        images, targets = images.to(device), targets.to(device)
        logits = model(images)
        loss = criterion(logits, targets)
        running_loss += loss.item() * images.size(0)
        n += images.size(0)
    return running_loss / n

EPOCHS = 12
best_val = float("inf")

for epoch in range(1, EPOCHS+1):
    train_loss = train_one_epoch(train_loader)
    val_loss   = validate_one_epoch(val_loader)

    print(f"Epoch {epoch:02d} | train_loss {train_loss:.4f} | val_loss {val_loss:.4f}")

    # save best by val loss
    if val_loss < best_val:
        best_val = val_loss
        torch.save(model.state_dict(), "best_tinyscnn.pt")