# 삼성전자 첨기연 시각 심화

- **Instructor**: Jongwoo Lim / Jiun Bae
- **Email**: [jlim@hanyang.ac.kr](mailto:jlim@hanyang.ac.kr) / [jiun.maydev@gmail.com](mailto:jiun.maydev@gmail.com)

In [8]:
from pathlib import Path
import yaml

import numpy as np
import pandas as pd
import torch

from MDNet.models.mdnet import MDNet, BCELoss, Precision
from MDNet.models.extractor import SampleGenerator, RegionDataset

## Dataset

Download OTB-50, 100 dataset from [CVLab](http://cvlab.hanyang.ac.kr/tracker_benchmark/datasets.html).

In [9]:
class Dataset:
    def __init__(self, root: str, options):
        self.sequences, self.images, self.ground_truths = map(list, zip(*[(
            str(seq.stem),
            list(map(str, sorted(seq.glob('img/*.jpg')))),
            pd.read_csv(str(seq.joinpath('groundtruth_rect.txt')), header=None, sep=r'\,|\t|\ ', engine='python').values,
        ) for seq in filter(lambda p: p.is_dir(), Path(root).iterdir())]))
        
        # assertion
        for i, _ in enumerate(self.sequences):
            if len(self.images[i]) != np.size(self.ground_truths[i], 0):
                self.images[i] = self.images[i][:self.ground_truths[i].shape[0]]
        
        self.regions = [RegionDataset(i, g, options) for i, g in zip(self.images, self.ground_truths)]
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return self.regions[idx]
    
    def __iter__(self):
        yield from self.regions

## Prepare Environments

Download pre-trained imagenet-vgg weights from [link](http://www.vlfeat.org/matconvnet/models/imagenet-vgg-m.mat)

```
wget "http://www.vlfeat.org/matconvnet/models/imagenet-vgg-m.mat"
```

In [14]:
opts = yaml.safe_load(open('MDNet/options.yaml','r'))

In [15]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [18]:
dataset = Dataset('../data/OTB', opts)

In [19]:
model = MDNet(opts['init_model_path'], len(dataset)).to(device)
model.set_learnable_params(opts['ft_layers'])

In [20]:
criterion = BCELoss()
evaluator = Precision()
optimizer = model.optimizer(opts['lr'], opts['lr_mult'])

## Train model

In [21]:
for b in range(opts['n_cycles']):
    model.train()
    
    prec = np.zeros(len(dataset))
    permute = np.random.permutation(len(dataset))

    for i, j in enumerate(permute):
        pos, neg = dataset[j].next()
        
        pos_loss = model(pos.to(device), j)
        neg_loss = model(neg.to(device), j)
        
        loss = criterion(pos_loss, neg_loss)
        
        model.zero_grad()
        loss.backward()
        if 'grad_clip' in opts:
            torch.nn.utils.clip_grad_norm_(model.parameters(), opts['grad_clip'])
        optimizer.step()
        
        prec[j] = evaluator(pos_loss, neg_loss)
        
        if not i % 10:
            print(f'Iter {i:2d} (Domain {j:2d}), Loss {loss.item():.3f}, Precision {prec[j]:.3f}')

    print(f'Batch {b:2d}: Mean Precision: {prec.mean():.3f}')
    
    torch.save({
        'shared_layers': model.cpu().layers.state_dict()
    }, opts['model_path'])
    
    model = model.to(device)

Iter  0 (Domain  0), Loss 0.634, Precision 0.406
Batch  0: Mean Precision: 0.406
Iter  0 (Domain  0), Loss 0.562, Precision 0.344
Batch  1: Mean Precision: 0.344
Iter  0 (Domain  0), Loss 0.532, Precision 0.344
Batch  2: Mean Precision: 0.344
Iter  0 (Domain  0), Loss 0.569, Precision 0.438
Batch  3: Mean Precision: 0.438
Iter  0 (Domain  0), Loss 0.469, Precision 0.469
Batch  4: Mean Precision: 0.469
Iter  0 (Domain  0), Loss 0.346, Precision 0.719
Batch  5: Mean Precision: 0.719
Iter  0 (Domain  0), Loss 0.322, Precision 0.750
Batch  6: Mean Precision: 0.750
Iter  0 (Domain  0), Loss 0.231, Precision 0.844
Batch  7: Mean Precision: 0.844
Iter  0 (Domain  0), Loss 0.232, Precision 0.812
Batch  8: Mean Precision: 0.812
Iter  0 (Domain  0), Loss 0.175, Precision 0.781
Batch  9: Mean Precision: 0.781
Iter  0 (Domain  0), Loss 0.117, Precision 0.906
Batch 10: Mean Precision: 0.906
Iter  0 (Domain  0), Loss 0.105, Precision 0.969
Batch 11: Mean Precision: 0.969
Iter  0 (Domain  0), Loss 0.

## Inference

In [23]:
from PIL import Image
import cv2
import matplotlib.pyplot as plt
import torch.optim as optim

from MDNet.utils import Options, overlap_ratio
from MDNet.models.extractor import RegionExtractor
from MDNet.models.regressor import BBRegressor

In [24]:
def forward_samples(model, image, samples, opts, out_layer='conv3'):
    model.eval()
    extractor = RegionExtractor(image, samples, opts.img_size, opts.padding, opts.batch_test)

    for i, regions in enumerate(extractor):
        if opts.use_gpu:
            regions = regions.cuda()

        with torch.no_grad():
            feat = model(regions, out_layer=out_layer)

        feats = torch.cat((feats, feat.detach().clone()), 0) if i else feat.detach().clone()
    return feats

In [25]:
def train(model, criterion, optimizer,
          pos_feats, neg_feats, maxiter, opts,
          in_layer='fc4'):
    model.train()

    batch_pos = opts.batch_pos
    batch_neg = opts.batch_neg
    batch_test = opts.batch_test
    batch_neg_cand = max(opts.batch_neg_cand, batch_neg)

    pos_idx = np.random.permutation(pos_feats.size(0))
    neg_idx = np.random.permutation(neg_feats.size(0))

    while len(pos_idx) < batch_pos * maxiter:
        pos_idx = np.concatenate([pos_idx, np.random.permutation(pos_feats.size(0))])

    while len(neg_idx) < batch_neg_cand * maxiter:
        neg_idx = np.concatenate([neg_idx, np.random.permutation(neg_feats.size(0))])

    pos_pointer = 0
    neg_pointer = 0

    for _ in range(maxiter):

        # select pos idx
        pos_next = pos_pointer + batch_pos
        pos_cur_idx = pos_idx[pos_pointer:pos_next]
        pos_cur_idx = pos_feats.new(pos_cur_idx).long()
        pos_pointer = pos_next

        # select neg idx
        neg_next = neg_pointer + batch_neg_cand
        neg_cur_idx = neg_idx[neg_pointer:neg_next]
        neg_cur_idx = neg_feats.new(neg_cur_idx).long()
        neg_pointer = neg_next

        # create batch
        batch_pos_feats = pos_feats[pos_cur_idx]
        batch_neg_feats = neg_feats[neg_cur_idx]

        # hard negative mining
        if batch_neg_cand > batch_neg:
            model.eval()

            for start in range(0, batch_neg_cand, batch_test):
                end = min(start + batch_test, batch_neg_cand)

                with torch.no_grad():
                    score = model(batch_neg_feats[start:end], in_layer=in_layer)

                if start == 0:
                    neg_cand_score = score.detach()[:, 1].clone()
                else:
                    neg_cand_score = torch.cat((neg_cand_score, score.detach()[:, 1].clone()), 0)

            _, top_idx = neg_cand_score.topk(batch_neg)
            batch_neg_feats = batch_neg_feats[top_idx]
            model.train()

        # forward
        pos_score = model(batch_pos_feats, in_layer=in_layer)
        neg_score = model(batch_neg_feats, in_layer=in_layer)

        # optimize
        loss = criterion(pos_score, neg_score)

        model.zero_grad()

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), opts.grad_clip)

        optimizer.step()

In [26]:
def main(images, init_bbox, ground_truths, opts):
    device = ('cuda' if opts.use_gpu else 'cpu')

    model = MDNet(opts.model_path).to(device)

    criterion = BCELoss()

    # Set learnable parameters
    for k, p in model.params.items():
        p.requires_grad = any([k.startswith(l) for l in opts.ft_layers])

    # Set optimizer states
    def set_optimizer(lr_base, lr_mult, momentum=0.9, w_decay=0.0005):
        param_list = []

        for k, p in filter(lambda kp: kp[1].requires_grad, model.params.items()):
            lr = lr_base
            for l, m in lr_mult.items():
                if k.startswith(l):
                    lr = lr_base * m
            param_list.append({'params': [p], 'lr': lr})

        return optim.SGD(param_list, lr=lr, momentum=momentum, weight_decay=w_decay)

    init_optimizer = set_optimizer(opts.lr_init, opts.lr_mult)
    update_optimizer = set_optimizer(opts.lr_update, opts.lr_mult)

    # Load first image
    image = Image.open(images[0]).convert('RGB')

    # Draw pos/neg samples
    pos_examples = SampleGenerator('gaussian', image.size, opts.trans_pos, opts.scale_pos)(
        init_bbox, opts.n_pos_init, opts.overlap_pos_init)

    neg_examples = np.concatenate([
        SampleGenerator('uniform', image.size, opts.trans_neg_init, opts.scale_neg_init)(
            init_bbox, int(opts.n_neg_init * 0.5), opts.overlap_neg_init),
        SampleGenerator('whole', image.size)(
            init_bbox, int(opts.n_neg_init * 0.5), opts.overlap_neg_init)])
    neg_examples = np.random.permutation(neg_examples)

    # Extract pos/neg features
    pos_feats = forward_samples(model, image, pos_examples, opts)
    neg_feats = forward_samples(model, image, neg_examples, opts)

    # Initial training
    train(model, criterion, init_optimizer, pos_feats, neg_feats, opts.maxiter_init, opts)
    del init_optimizer, neg_feats
    torch.cuda.empty_cache()

    # Train bbox regressor
    bbreg_examples = SampleGenerator('uniform', image.size, opts.trans_bbreg, opts.scale_bbreg, opts.aspect_bbreg)\
        (init_bbox, opts.n_bbreg, opts.overlap_bbreg)

    bbreg_feats = forward_samples(model, image, bbreg_examples, opts)
    bbreg = BBRegressor(image.size)
    bbreg.train(bbreg_feats, bbreg_examples, init_bbox)
    del bbreg_feats
    torch.cuda.empty_cache()

    # Init sample generators for update
    sample_generator = SampleGenerator('gaussian', image.size, opts.trans, opts.scale)
    pos_generator = SampleGenerator('gaussian', image.size, opts.trans_pos, opts.scale_pos)
    neg_generator = SampleGenerator('uniform', image.size, opts.trans_neg, opts.scale_neg)

    # Init pos/neg features for update
    neg_examples = neg_generator(init_bbox, opts.n_neg_update, opts.overlap_neg_init)
    neg_feats = forward_samples(model, image, neg_examples, opts)
    pos_feats_all = [pos_feats]
    neg_feats_all = [neg_feats]

    # Main loop
    for i, image in enumerate(images[1:], 1):
        image = Image.open(image).convert('RGB')

        # Estimate target bbox
        samples = sample_generator(init_bbox, opts.n_samples)
        sample_scores = forward_samples(model, image, samples, opts, out_layer='fc6')

        top_scores, top_idx = sample_scores[:, 1].topk(5)
        top_idx = top_idx.cpu()
        target_score = top_scores.mean()
        init_bbox = samples[top_idx]
        if top_idx.shape[0] > 1:
            init_bbox = init_bbox.mean(axis=0)
        success = target_score > 0

        # Expand search area at failure
        sample_generator.trans = opts.trans if success else min(sample_generator.trans * 1.1, opts.trans_limit)

        # Bbox regression
        if success:
            bbreg_samples = samples[top_idx]

            if top_idx.shape[0] == 1:
                bbreg_samples = bbreg_samples[None, :]

            bbreg_feats = forward_samples(model, image, bbreg_samples, opts)
            bbreg_samples = bbreg.predict(bbreg_feats, bbreg_samples)
            bbreg_bbox = bbreg_samples.mean(axis=0)

        else:
            bbreg_bbox = init_bbox

        yield init_bbox, bbreg_bbox, overlap_ratio(ground_truths[i], bbreg_bbox)[0], target_score

        # Data collect
        if success:
            pos_examples = pos_generator(init_bbox, opts.n_pos_update, opts.overlap_pos_update)
            pos_feats = forward_samples(model, image, pos_examples, opts)
            pos_feats_all.append(pos_feats)

            if len(pos_feats_all) > opts.n_frames_long:
                del pos_feats_all[0]

            neg_examples = neg_generator(init_bbox, opts.n_neg_update, opts.overlap_neg_update)
            neg_feats = forward_samples(model, image, neg_examples, opts)
            neg_feats_all.append(neg_feats)

            if len(neg_feats_all) > opts.n_frames_short:
                del neg_feats_all[0]

        # Short term update
        if not success:
            nframes = min(opts.n_frames_short, len(pos_feats_all))
            pos_data = torch.cat(pos_feats_all[-nframes:], 0)
            neg_data = torch.cat(neg_feats_all, 0)
            train(model, criterion, update_optimizer, pos_data, neg_data, opts.maxiter_update, opts)

        # Long term update
        elif i % opts.long_interval == 0:
            pos_data = torch.cat(pos_feats_all, 0)
            neg_data = torch.cat(neg_feats_all, 0)
            train(model, criterion, update_optimizer, pos_data, neg_data, opts.maxiter_update, opts)

        torch.cuda.empty_cache()

### (Optional)

Refresh image output in IPython

In [27]:
from IPython.display import clear_output
%matplotlib inline

## Showcase

In [30]:
options = Options()
options.use_gpu = False
options.model_path = '../data/mdnet_otb.pth'
dataset = Path('../data/OTB/DragonBaby')

images = list(sorted(dataset.joinpath('img').glob('*.jpg')))
ground_truths = pd.read_csv(str(dataset.joinpath('groundtruth_rect.txt')), header=None).values

iou, success = 0, 0

# Run tracker
for i, (result, (x, y, w, h), overlap, score) in \
        enumerate(main(images, ground_truths[0], ground_truths, options), 1):
    
    clear_output(wait=True)
    
    image = np.asarray(Image.open(images[i]).convert('RGB'))

    gx, gy, gw, gh = ground_truths[i]
    cv2.rectangle(image, (int(gx), int(gy)), (int(gx+gw), int(gy+gh)), (0, 255, 0), 2)
    cv2.rectangle(image, (int(x), int(y)), (int(x+w), int(y+h)), (255, 0, 0), 2)

    iou += overlap
    success += overlap > .5
    
    plt.imshow(image)
    plt.pause(.1)
    plt.title(f'#{i}/{len(images)-1}, Overlap {overlap:.3f}, Score {score:.3f}')
    plt.draw()

iou /= len(images) - 1
print(f'Mean IOU: {iou:.3f}, Success: {success} / {len(images)-1}')

ValueError: Found array with dim 4. Estimator expected <= 2.