### Finetune DETR to detect female-ish faces in paintings

In [None]:
! pip install --upgrade scipy transformers datasets huggingface_hub torch torchvision torchaudio pytorch-lightning pycocotools

In [None]:
!huggingface-cli login

In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
import matplotlib.pyplot as plt
import os
import pytorch_lightning as pl
import torch
import torchvision
import torchvision.transforms as T

from hf2coco import create_cocordiais_from_hf
from torch.utils.data import DataLoader
from transformers import DetrConfig, DetrForObjectDetection, DetrImageProcessor

from PIL import Image as PImage

### Load dataset from HF and turn to COCO format

In [None]:
HF_DATASET = "thiagohersan/cordiais-faces"
HF_MODEL= "thiagohersan/detr-cordiais"
COCORDIAIS_PATH = "./cocordiais"

create_cocordiais_from_hf(HF_DATASET, COCORDIAIS_PATH)

### Create PyTorch Dataset and DataLoaders

In [None]:
class CocoDetection(torchvision.datasets.CocoDetection):
  def GaussianNoise(sigma=25.0):
    def gauss_noise(img):
      dtype = img.dtype
      if not img.is_floating_point():
        img = img.to(torch.float32)

      out = img + sigma * torch.randn_like(img)

      if out.dtype != dtype:
         out = out.to(dtype)
      return out
    return gauss_noise

  def __init__(self, img_folder, processor, train=True):
    ann_file = os.path.join(img_folder, "cocordiais.json")
    super(CocoDetection, self).__init__(img_folder, ann_file)
    self.processor = processor
    self.train = train
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.transform = T.Compose([
      T.ColorJitter(brightness=0.5, hue=0.3),
      T.ElasticTransform(alpha=30.0),
      T.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
      T.RandomPosterize(bits=2),
      T.RandomEqualize(),
      CocoDetection.GaussianNoise(sigma=25.0)
    ])

  def __getitem__(self, idx):
    # feel free to add data augmentation here before passing them to the next step
    img, target = super(CocoDetection, self).__getitem__(idx)
    if self.train:
      img = T.PILToTensor()(img).to(self.device)
      img = T.ToPILImage()(self.transform(img).to("cpu"))

    image_id = self.ids[idx]
    target = {"image_id": image_id, "annotations": target}
    encoding = self.processor(images=img, annotations=target, return_tensors="pt")
    pixel_values = encoding["pixel_values"].squeeze()
    target = encoding["labels"][0]

    return pixel_values, target

In [None]:
processor = DetrImageProcessor.from_pretrained(
  "facebook/detr-resnet-50",
  size={
    "shortest_edge": 800,
    "longest_edge": 800
  }
)

train_dataset = CocoDetection(img_folder=os.path.join(COCORDIAIS_PATH, "train"), processor=processor)
test_dataset = CocoDetection(img_folder=os.path.join(COCORDIAIS_PATH, "test"), processor=processor, train=False)

print("Number of examples:\n  Train: %s\n  Test: %s" % (len(train_dataset), len(test_dataset)))

In [None]:
def collate_fn(batch):
  pixel_values = [item[0] for item in batch]
  encoding = processor.pad(pixel_values, return_tensors="pt")
  labels = [item[1] for item in batch]
  batch = {}
  batch["pixel_values"] = encoding["pixel_values"]
  batch["pixel_mask"] = encoding["pixel_mask"]
  batch["labels"] = labels
  return batch

train_dataloader = DataLoader(train_dataset, collate_fn=collate_fn, batch_size=4, shuffle=True)
val_dataloader = DataLoader(test_dataset, collate_fn=collate_fn, batch_size=2)

In [None]:
# verify
batch = next(iter(train_dataloader))
pixel_values, target = train_dataset[0]

print(batch.keys())
print(pixel_values.shape)
print(target)

### Train with PyTorchLightning

In [None]:
cats = train_dataset.coco.cats
id2label = {k: v['name'] for k,v in cats.items()}

class Detr(pl.LightningModule):
  def __init__(self, lr, lr_backbone, weight_decay):
    super().__init__()
    # replace COCO classification head with custom head
    self.model = DetrForObjectDetection.from_pretrained(
      "facebook/detr-resnet-50",
      revision="no_timm", 
      num_labels=len(id2label),
      num_queries=16,
      ignore_mismatched_sizes=True
    )

    self.lr = lr
    self.lr_backbone = lr_backbone
    self.weight_decay = weight_decay

  def forward(self, pixel_values, pixel_mask):
    outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask)
    return outputs

  def common_step(self, batch, batch_idx):
    pixel_values = batch["pixel_values"]
    pixel_mask = batch["pixel_mask"]
    labels = [{k: v.to(self.device) for k, v in t.items()} for t in batch["labels"]]

    outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask, labels=labels)

    loss = outputs.loss
    loss_dict = outputs.loss_dict

    return loss, loss_dict

  def training_step(self, batch, batch_idx):
    loss, loss_dict = self.common_step(batch, batch_idx)
    self.log("training_loss", loss)
    for k,v in loss_dict.items():
      self.log("train_" + k, v.item())
    return loss

  def validation_step(self, batch, batch_idx):
    loss, loss_dict = self.common_step(batch, batch_idx)     
    self.log("validation_loss", loss)
    for k,v in loss_dict.items():
      self.log("validation_" + k, v.item())
    return loss

  def configure_optimizers(self):
    param_dicts = [
      {
        "params": [p for n, p in self.named_parameters() if "backbone" not in n and p.requires_grad]
      },
      {
        "params": [p for n, p in self.named_parameters() if "backbone" in n and p.requires_grad],
        "lr": self.lr_backbone,
      },
    ]
    optimizer = torch.optim.AdamW(param_dicts, lr=self.lr,
    weight_decay=self.weight_decay)

    return optimizer

  def train_dataloader(self):
    return train_dataloader

  def val_dataloader(self):
    return val_dataloader


In [None]:
# verify the outputs
model = Detr(lr=1e-4, lr_backbone=1e-5, weight_decay=1e-4)
outputs = model(pixel_values=batch['pixel_values'], pixel_mask=batch['pixel_mask'])
outputs.logits.shape

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

In [None]:
trainer = pl.Trainer(max_epochs=32, gradient_clip_val=0.1, accelerator="auto")
trainer.fit(model)

In [None]:
model.model.push_to_hub(HF_MODEL)
processor.push_to_hub(HF_MODEL)

### Reload model

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
id2label = { 0: "female", 1: "not-female" }

In [None]:
model = DetrForObjectDetection.from_pretrained(HF_MODEL, id2label=id2label)
processor = DetrImageProcessor.from_pretrained(HF_MODEL)
model.to(device)

### Run on test data

In [None]:
COLORS = [
  [0.000, 0.447, 0.741], [0.850, 0.325, 0.098], [0.929, 0.694, 0.125],
  [0.494, 0.184, 0.556], [0.466, 0.674, 0.188], [0.301, 0.745, 0.933]
]

def plot_results(pil_img, scores, labels, boxes, id2label):
  plt.figure(figsize=(16,10))
  plt.imshow(pil_img)
  ax = plt.gca()
  colors = COLORS * 100
  for score, label, (xmin, ymin, xmax, ymax),c  in zip(scores.tolist(), labels.tolist(), boxes.tolist(), colors):
    ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color=c, linewidth=3))
    text = f'{id2label[label]}: {score:0.2f}'
    ax.text(xmin, ymin, text, fontsize=15, bbox=dict(facecolor='yellow', alpha=0.5))
  plt.axis('off')
  plt.show()

In [None]:
pixel_values, target = test_dataset[0]
pixel_values = pixel_values.unsqueeze(0).to(device)
print(pixel_values.shape)

In [None]:
model.to(device)
with torch.no_grad():
  outputs = model(pixel_values=pixel_values, pixel_mask=None)

In [None]:
image_id = target["image_id"].item()
image = test_dataset.coco.loadImgs(image_id)[0]
image = PImage.open(os.path.join(COCORDIAIS_PATH, "test", image["file_name"]))

width, height = image.size
postprocessed_outputs = processor.post_process_object_detection(
  outputs,
  target_sizes=[(height, width)],
  threshold=0.5
)

results = postprocessed_outputs[0]
plot_results(image, results["scores"], results["labels"], results["boxes"], id2label)