In [1]:
%pylab inline

import os
import numpy as np

from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F

Populating the interactive namespace from numpy and matplotlib


In [2]:
from torchvision.models import resnet34

class SomeNet(torch.nn.Module):
    def __init__(self, num_classes=1, num_anchors=1):
        super().__init__()
        
        self.num_classes = num_classes
        self.num_anchors = num_anchors
        
        self.encoder = resnet34(pretrained=True)
        
        self.relu = nn.ReLU(inplace=True) 
        
        self.reg_head = nn.Conv2d(512, (4+1)*num_anchors, 1)
        
        self.class_head = nn.Sequential(
            nn.Conv2d(512, num_anchors*512, 1),
            nn.BatchNorm2d(num_anchors*512),    #GroupNorm(num_anchors, num_anchors*512)
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):        
        #-> 3 320*320
        
        x = self.encoder.conv1(x)
        x = self.encoder.bn1(x)
        x = self.encoder.relu(x)
        x = self.encoder.maxpool(x)
        
        x = self.encoder.layer1(x) 
        x = self.encoder.layer2(x)        
        x = self.encoder.layer3(x)        
        x = self.encoder.layer4(x)
        #-> 512 10*10

        
        x = nn.AdaptiveAvgPool2d((5, 5))(x)
        
        x_reg = self.reg_head(x)
        x_class = self.class_head(x)
                
        return x_reg, x_class

In [None]:
class Classifier(torch.nn.Module):
    def __init__(self, num_classes=1, num_features=512, num_anchors=3):
        super().__init__()
        
        self.num_features = num_features
        self.num_anchors = num_anchors  
        self.metric_fn = nn.Conv3d(num_features, num_classes, 1)
        
    def forward(self, x, target=None):
      
        nB, _, nH, nW = x.shape
        nC = self.num_features
        nA = self.num_anchors
        
        x = x.view(nB, nC, nA, nH, nW)
        x_class = self.metric_fn(x)
        
        x_class = x_class.permute(0, 2, 3, 4, 1) #.contiguous() 
       
        return x_class

In [3]:
import json

with open('./data/train/labels.txt', 'r') as f:
    train_meta = json.load(f)
with open('./data/test/labels.txt', 'r') as f:
    test_meta = json.load(f)
with open('./data/subjects.txt', 'r') as f:
    subj_meta = json.load(f)

In [4]:
from torch.utils.data import DataLoader
from torchvision import transforms

from FaceDataset import FaceDataset
from region_loss import RegionLoss
from box_transforms import *

torch.cuda.empty_cache()

#Anchors and classes
anchors=[(30,30), (80,80), (120,120)]
num_anchors = len(anchors)
num_classes = 285

#Transforms on dataset
box_transform = Compose([
    ResizeWithBox(320),
    RandomCropWithBox(320)
])

val_box_transform = Compose([
    ResizeWithBox(320),
    CenterCropWithBox(320)
])

transform = transforms.Compose([
    transforms.ColorJitter(brightness=.1, hue=.05, saturation=.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

#Dataset and loader
trainset = FaceDataset(train_meta, box_transform=box_transform, img_transform=transform)
testset = FaceDataset(test_meta, box_transform = val_box_transform, img_transform=val_transform)

batch_size = 128
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
testloader = DataLoader(testset, batch_size=batch_size, shuffle=False)

#Network
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
net = SomeNet(num_classes=num_classes, num_anchors=num_anchors).to(device)
classifier = Classifier(num_classes=num_classes, num_anchors=num_anchors).to(device)
#net.load_state_dict(torch.load('/gdrive/My Drive/data/basenet34_smoothl1_320_e25'))

In [5]:
for batch in trainloader:
    break
batch['img'].shape

torch.Size([2, 3, 640, 640])

In [6]:
lr = 1e-3

criterion = RegionLoss(num_classes=num_classes, anchors=anchors, num_anchors=num_anchors)
optimizer = torch.optim.Adam([{'params': net.parameters()}, {'params': classifier.parameters()}], lr=lr)

from torch.optim.lr_scheduler import StepLR
scheduler = StepLR(optimizer, 15)

In [9]:
def train(verbose=True, info_step=100):
    net.train()
    running_loss = 0.0
    train_loss = 0.0
    
    losses = [0 for i in range(4)]
    running_losses = [0 for i in range(4)]
    
    for i, batch in enumerate(trainloader):
        optimizer.zero_grad()
        
        x_reg, x_features = net(batch['img'].to(device))
        x_class = classifier(x_features, batch['target'].to(device))
        loss, info = criterion(x_reg, x_class, batch['target'].to(device))
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        train_loss += loss.item()
        
        for j in range(len(losses)):
            losses[j] += info[j]
            running_losses[j] += info[j]
        
        if (i+1) % info_step == 0:
            print(' [{} - {}],\ttrain loss: {:.5}'.format(epoch+1, i+1, running_loss/info_step/batch_size))
            running_loss = 0.0
            
            for j in range(len(losses)):
                running_losses[j] /= info_step*batch_size
            print(' coord loss: {:.5} \tobj loss: {:.5} \tclass loss: {:.5} \tacc: {:.5}'.format(*running_losses))
            running_losses = [0 for j in range(4)]
            
    train_loss /= len(trainset)
    
    for i in range(len(losses)):
        losses[i] /= len(trainset)
        
    print('\n [{}], \ttrain loss: {:.5}'.format(epoch+1, train_loss))
    print(' coord loss: {:.5} \tobj loss: {:.5} \tclass loss: {:.5} \tacc: {:.5}'.format(*losses))
    return train_loss, losses
  

def validate():
    net.eval()
    losses = [0 for i in range(4)]
    val_loss = 0.0
    for i, batch in enumerate(testloader):
        with torch.no_grad():
            x_reg, x_features = net(batch['img'].to(device))
            x_class = classifier(x_features, batch['target'].to(device))
            loss, info  = criterion(x_reg, x_class, batch['target'].to(device))
        
        val_loss += loss.detach().item()
        for j in range(len(losses)):
            losses[j] += info[j]
            
    val_loss /= len(testset)
    for i in range(len(losses)):
        losses[i] /= len(testset)
        
    print(' [{}], \tval loss: {:.5}'.format(epoch+1, val_loss))
    print(' coord loss: {:.5} \tobj loss: {:.5} \tclass loss: {:.5} \tacc: {:.5}'.format(*losses))
    print()
    return val_loss, info

In [10]:
import time

num_epoch = 5
history = []
detailed_history = []

net.train()
for epoch in range(num_epoch):
    scheduler.step()
    train_loss, train_info = train(info_step=10)
    val_loss, test_info = validate()
    history.append((train_loss, val_loss))
    detailed_history.append((train_info, test_info))

 [1 - 80],	train loss: 57.135
 coord loss: 0.39881 	obj loss: 0.36042 	class loss: 56.376 	acc: 0.0125
 [1 - 160],	train loss: 56.848
 coord loss: 0.19964 	obj loss: 0.24367 	class loss: 56.404 	acc: 0.0375
 [1 - 240],	train loss: 56.543
 coord loss: 0.11432 	obj loss: 0.18228 	class loss: 56.246 	acc: 0.075


IndexError: too many indices for array

# Test

In [None]:
def bbox_iou_numpy(box1, box2):
    area = (box2[2] - box2[0]) * (box2[3] - box2[1])

    iw = np.minimum(np.expand_dims(box1[2], axis=1), box2[2]) - np.maximum(
        np.expand_dims(box1[0], 1), box2[0]
    )
    ih = np.minimum(np.expand_dims(box1[3], axis=1), box2[3]) - np.maximum(
        np.expand_dims(box1[1], 1), box2[1]
    )

    iw = np.maximum(iw, 0)
    ih = np.maximum(ih, 0)

    ua = np.expand_dims((box1[2] - box1[0]) * (box1[3] - box1[1]), axis=1) + area - iw * ih

    ua = np.maximum(ua, np.finfo(float).eps)

    intersection = iw * ih

    return intersection / ua

In [None]:
import time
from utils import non_max_suppression

testloader = DataLoader(testset, batch_size=1, shuffle=False)

no_face = []
wrong = []
mae = 0
iou = 0
acc = 0

net.eval()

start = time.time()
for i,batch in enumerate(testloader):
    with torch.no_grad():
        output = net(batch['img'].to(device))
        
        nB,_,nH,nW = output.size()
        nC = 285
        nA = 3

        anchors = criterion.anchors
        stride = 1

        prediction = output.view(nB, nA, (4+1+nC), nH, nW)         # reshape for convenience
        prediction = prediction.permute(0, 1, 3, 4, 2).contiguous()    # Get bbox_attr dimention to be the last

        # Get attributes from output tensor
        prediction[..., 0] = torch.sigmoid(prediction[..., 0])  # Center x
        prediction[..., 1] = torch.sigmoid(prediction[..., 1])  # Center y
        prediction[..., 4] = torch.sigmoid(prediction[..., 4])  # Conf
        prediction[..., 5:] = torch.softmax(prediction[..., 5:], dim=-1)  # Cls distribution


        # Calculate offsets for each grid       
        grid_x = torch.arange(nW, dtype=torch.float32).repeat(nW, 1).view([1, 1, nH, nW]).to(device)
        grid_y = torch.arange(nH, dtype=torch.float32).repeat(nH, 1).t().view([1, 1, nH, nW]).to(device)
        scaled_anchors = torch.FloatTensor([(a_w / stride, a_h / stride) for a_w, a_h in anchors]).to(device)
        anchor_w = scaled_anchors[:, 0].view((1, nA, 1, 1))
        anchor_h = scaled_anchors[:, 1].view((1, nA, 1, 1))

        # Add offset and scale with anchors
        prediction[..., 0] = prediction[..., 0] + grid_x
        prediction[..., 1] = prediction[..., 1] + grid_y
        prediction[..., 2] = torch.exp(prediction[..., 2]) * anchor_w
        prediction[..., 3] = torch.exp(prediction[..., 3]) * anchor_h

        prediction = prediction.view(nB, nA*nH*nW, 4+1+nC)
        pred = non_max_suppression(prediction, nC)[0]
        if pred is None:
            no_face.append(i)
            continue
        
        #FIXME
        #TODO clamp
        pred_boxes = pred[:, :4].cpu().numpy()
        scores = pred[:, 4].cpu().numpy()
        pred_labels = pred[:, -1].cpu().numpy()

        sort_i = np.argsort(scores)
        pred_labels = pred_labels[sort_i]
        pred_boxes = pred_boxes[sort_i]
        
        pred_box = pred_boxes[-1]/5
        
        true_box = batch['target'][0][-1][:4].cpu().numpy()
        tb_x1, tb_x2 = true_box[0] - true_box[2] / 2, true_box[0] + true_box[2] / 2
        tb_y1, tb_y2 = true_box[1] - true_box[3] / 2, true_box[1] + true_box[3] / 2
        true_box = np.array([tb_x1, tb_y1, tb_x2, tb_y2])
        
        mae += np.sum(np.abs(pred_box-true_box))
        iou += bbox_iou_numpy(pred_box, true_box)
        
        
        label = pred_labels[-1]
        true_label = batch['target'][0][0][-1].item()
        
        if label==true_label:
            acc+=1
        else:
            wrong.append(i)
            
start = time.time() - start
testloader = DataLoader(testset, batch_size=batch_size, shuffle=False)

print(no_face)
print(wrong)
print("mae", mae/len(testset)*320)
print("iou", iou/len(testset))
print("acc", acc/len(testset))
print("fps", len(testset)/start)

In [None]:
#Test code
net.eval()

test_transforms = transforms.Compose([
    transforms.CenterCrop(320),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

filepath = '06.jpg'
img = Image.open(filepath)
img = transforms.Resize(250)(img)
tensor = test_transforms(img).unsqueeze(0)

with torch.no_grad():
    output = net(tensor.to(device))

In [None]:
nB,_,nH,nW = output.size()
nC = 285
nA = 3

anchors = criterion.anchors
stride = 1

prediction = output.view(nB, nA, (4+1+nC), nH, nW)         # reshape for convenience
prediction = prediction.permute(0, 1, 3, 4, 2).contiguous()    # Get bbox_attr dimention to be the last

# Get attributes from output tensor
prediction[..., 0] = torch.sigmoid(prediction[..., 0])  # Center x
prediction[..., 1] = torch.sigmoid(prediction[..., 1])  # Center y
prediction[..., 4] = torch.sigmoid(prediction[..., 4])  # Conf
prediction[..., 5:] = torch.sigmoid(prediction[..., 5:])  # Cls distribution

# Calculate offsets for each grid       
grid_x = torch.arange(nW, dtype=torch.float32).repeat(nW, 1).view([1, 1, nH, nW]).to(device)
grid_y = torch.arange(nH, dtype=torch.float32).repeat(nH, 1).t().view([1, 1, nH, nW]).to(device)
scaled_anchors = torch.FloatTensor([(a_w / stride, a_h / stride) for a_w, a_h in anchors]).to(device)
anchor_w = scaled_anchors[:, 0].view((1, nA, 1, 1))
anchor_h = scaled_anchors[:, 1].view((1, nA, 1, 1))

# Add offset and scale with anchors
prediction[..., 0] = prediction[..., 0] + grid_x
prediction[..., 1] = prediction[..., 1] + grid_y
prediction[..., 2] = torch.exp(prediction[..., 2]) * anchor_w
prediction[..., 3] = torch.exp(prediction[..., 3]) * anchor_h

prediction = prediction.view(nB, nA*nH*nW, 4+1+nC)

In [None]:
all_detections = []

#for pred in a:  
pred = non_max_suppression(prediction, nC)[0]

all_detections.append([np.array([]) for _ in range(nC)])

pred_boxes = pred[:, :4].cpu().numpy()
scores = pred[:, 4].cpu().numpy()
pred_labels = pred[:, -1].cpu().numpy()

sort_i = np.argsort(scores)
pred_labels = pred_labels[sort_i]
pred_boxes = pred_boxes[sort_i]

for label in range(nC):
    all_detections[-1][label] = pred_boxes[pred_labels == label]
    
pred_labels

In [None]:
from PIL import ImageDraw

draw_img = transforms.CenterCrop(640)(img)

box = pred_boxes[-1]/5*640

draw = ImageDraw.Draw(draw_img)
draw.rectangle(box)
del draw

draw_img