# visualizer
[site](file:///C:/Users/thoma/Desktop/sublime/python/%E5%80%8B%E5%88%A5%E7%A7%91%E5%AD%B8%E7%A0%94%E7%A9%B6/data/view/index.html)
# issues
vanish cost不一致，容易換人消失
sidewalk 292

In [3]:
import numpy as np
import json, orjson, os
import matplotlib.pyplot as plt
from copy import deepcopy
from scipy.optimize import linear_sum_assignment
import warnings
np.seterr(all='ignore')

{'divide': 'ignore', 'over': 'ignore', 'under': 'ignore', 'invalid': 'ignore'}

# wrnch.ai parser
[devportal](https://devportal.wrnch.ai/wrnchcloud)

In [4]:
# hyperparameters
SCORE_THRESHOLD = 0.7
JOINT_MAP = [16, 7, 12, 11, 10, 13, 14, 15, 6, 2, 1, 0, 3, 4, 5]

In [5]:
def parseWrnch(file):
    if type(file) == str:
        with open(file, 'r') as f:
            data = json.load(f)
    else:
        data = file
    pose, ids = [], []
    h, w = data['frames'][0]['height'], data['frames'][0]['width']
    for frame in data['frames']:
        pose.append([])
        ids.append([])
        for person in frame['persons']:
            trust = np.array(person['pose2d']['scores']) > SCORE_THRESHOLD
            pose[-1].append(np.where(
                np.append(trust.reshape(-1, 1), trust.reshape(-1, 1), axis=1),
                np.matmul(np.array(person['pose2d']['joints']).reshape(-1, 2), [[w, 0], [0, h]]),
                np.full((25, 2), np.nan)
            )[JOINT_MAP])
            ids[-1].append(person['id'])
            if np.isnan(pose[-1][-1]).all():
                pose[-1].pop()
                ids[-1].pop()
    return {
        'height': h,
        'width': w,
        'pose': pose,
        'id': ids,
        'fps': data['file_info']['frame_rate']
    }

# functions

In [12]:
class Tracker:
    def __init__(self, dist_function=0, decay_rate=0.95, ignore_threshold=0.1):
        # 0: centroid, 1: average_distance
        self.dist_function = dist_function
        self.decay_rate = decay_rate
        self.ignore_threshold = ignore_threshold
    def activation(self, x):
        x /= min(self.w, self.h)
        return np.exp(200 * x)
    def values(self, arr):
        return arr[~np.isnan(arr)]
    def getBbox(self, pose):
        # (xmin, xmax, ymin, ymax)
        return np.array([
            np.nanmin(pose[:, 0]),
            np.nanmax(pose[:, 0]),
            np.nanmin(pose[:, 1]),
            np.nanmax(pose[:, 1])
        ])
    def centroid(self, pose):
        x1, x2, y1, y2 = self.getBbox(pose)
        return np.array([(x1 + x2) / 2, (y1 + y2) / 2])
    def preprocess(self, data):
        self.data = deepcopy(data)
        for i in range(len(self.data)):
            self.data[i] = np.array(self.data[i], dtype='float32')
    def dist(self, p1, p2):
        if self.dist_function == 0:
            return np.hypot(*(self.centroid(p1)-self.centroid(p2)))
        elif self.dist_function == 1:
            return np.nanmean(np.hypot(*np.swapaxes(p1 - p2, 0, 1)))
    def cal_cost(self, track, pose):
        d = self.activation(self.dist(track[2],pose))
        if np.isnan(d):
            return np.inf
        else:
            return d / track[1]
#     def opt_match(self, cost, **kwarg):
#         ok_i, ok_j = ~np.isinf(cost).all(axis=1), ~np.isinf(cost).all(axis=0)
#         (idx_i,), (idx_j,) = np.nonzero(ok_i), np.nonzero(ok_j)
#         cost = cost[np.ix_(ok_i, ok_j)]
#         ii, jj = linear_sum_assignment(cost, **kwarg)
#         return idx_i[ii], idx_j[jj]
    def vanish_cost(self, loss):
        loss = np.log10(np.sort(loss[~np.isinf(loss)]))
        for i in range(len(loss)-1):
            if loss[i+1] > loss[i] + 10:
                return np.power(10, (loss[i] + loss[i+1])/2)
        return self.maxcost
    def opt_match(self, cost, **kwarg):
        # TODO: adaptive vansish cost
        n, m = cost.shape[0], cost.shape[1]
        vanish_costi = np.apply_along_axis(self.vanish_cost, 1, cost) if cost.size != 0 else np.zeros((n,))
        vanish_costj = np.apply_along_axis(self.vanish_cost, 0, cost) if cost.size != 0 else np.zeros((m,))
        c = np.zeros((n + m, m))
#         c = np.zeros((n + m, n + m))
        c[n:, :m] = np.repeat([vanish_costj], m, axis=0).astype('float64')
#         c[:n, m:] = np.repeat(vanish_costi.reshape((-1, 1)), n, axis=1).astype('float64')
        c[:n, :m] = cost
#         c[n:, m:] = c.max() * 10
#         with np.printoptions(suppress=True):
#             print(np.where(np.isinf(c), np.inf, (10 * np.log10(c)).astype('int32').astype('float32')))
        ii, jj = linear_sum_assignment(c, **kwarg)
        matched = (ii < n) & (jj < m)
        return ii[matched], jj[matched]
    def match(self, new):
        cost = np.full((len(self.tracks), len(new)), np.inf)
        for i in range(len(cost)):
            for j in range(len(cost[0])):
                cost[i][j] = self.cal_cost(self.tracks[i], new[j])
        cost = np.where(np.isnan(cost), np.inf, cost)
        ii, jj = self.opt_match(cost)
#         print(self.t, cost[ii,jj].mean())
#         print()
        self.last_max_cost = cost[ii,jj].max() if len(ii) != 0 else self.maxcost
        self.last_avg_cost = np.exp(np.log(cost[ii,jj]).mean()) if len(ii) != 0 else self.maxcost
        respective_track = np.full((len(new),), -1)
        for i, j in zip(ii, jj):
            respective_track[j] = i
        return respective_track
    def update(self, frame, mapping):
        for t in self.tracks:
            t[1] *= self.decay_rate
        for j, i in enumerate(mapping):
            if i == -1:
                self.tracks.append([self.cnt, 1, frame[j]])
                self.labels.append((self.cnt, self.t, j))
                self.cnt += 1
            else:
                self.tracks[i][1] = 1
                self.tracks[i][2] = frame[j]
                self.labels.append((self.tracks[i][0], self.t, j))
        self.tracks = list(filter(lambda x: x[1] >= self.ignore_threshold, self.tracks))
    def init(self, size):
        # [(id, t, idx), ...]
        self.labels = []
        # [[id, weight(0~1), pose], ...]
        self.tracks = []
        self.t = 0
        self.cnt = 0
        self.w = size[0]
        self.h = size[1]
        self.maxcost = self.activation(np.hypot(self.w, self.h))
        self.last_max_cost = self.maxcost
        self.last_avg_cost = self.maxcost
    def track(self, frame):
        mapping = self.match(frame)
        ret = map(lambda x: self.tracks[x][0] if x != -1 else None, mapping)
        self.update(frame, mapping)
        self.t += 1
        return ret
    def trackAll(self, data, size):
        self.preprocess(data)
        self.init(size)
        for frame in self.data:
            self.track(frame)
        print(f'idenitfied {self.cnt} persons')
    def generateJS(self, name, pose, suffix=''):
        res = deepcopy(pose)
        res['pred_id'] = self.labels
        res['name'] = name
        res = orjson.dumps(res, option=orjson.OPT_INDENT_2|orjson.OPT_SERIALIZE_NUMPY)
        with open(f'./data/view/{name}{suffix}.js', 'wb+') as f:
            f.write(b'const data = ')
            f.write(res)
            f.write(b';')
        files = [x[:-3] for x in os.listdir('./data/view') if x.split('.')[-1] == 'js']
        if 'list' in files:
            files.remove('list')
        with open('./data/view/list.js', 'wb+') as f:
            f.write(b'const fileList = ')
            f.write(orjson.dumps(files))
            f.write(b';')

In [7]:
def int_print(arr):
    if arr.size == 0:
        print('zero:', arr.shape)
        return
    d = int(max(np.log10(abs(arr[(~np.isinf(arr))&(~np.isnan(arr))])).max() + 2, 2)) if not np.isinf(arr).all() else 5
    s = '{:'+str(d)+'.0f}'
    f = lambda x: s.format(x) if not np.isinf(x) else ' ' * (d-3) + 'inf'
    print('[')
    for i in arr:
        print('  [', ' '.join([f(j) for j in i]), ']')
    print(']')

In [8]:
fail = []
def accuracy(tracker, true_ids):
    pred_id = dict()
    for ID, t, i in tracker.labels:
        pred_id[(t, i)] = ID
    
    tot = 0
    labels = []
    for t in range(len(true_ids)):
        tot += len(true_ids[t])
        for i, ID in enumerate(true_ids[t]):
            labels.append((ID, t, i))
    labels.sort()
    miss = 0
    for i, (ID, t, idx) in enumerate(labels):
        # print(ID, pred_id.get((t, idx), -1))
        if i == 0 or (ID, t-1) == labels[i-1][:2]:
            i1 = pred_id.get((t-1, labels[i-1][2]), -1)
            i2 = pred_id.get((t, idx), -1)
            if i2 == -1:
                miss += 1
                fail.append((t, ID))
            elif i1 == -1:
                pass
            else:
                if i1 != i2:
                    miss += 1
                    fail.append((t, ID))
    return 100 * (tot - miss) / tot

In [9]:
def optimize(initial=(0.95, 0.1), interval=(0.01, 0.01)):
    initial = list(initial)
    def evaluate(params, i, delta):
        params = deepcopy(params)
        params[i] += delta
        tracker = Tracker(0, *params)
        tracker.trackAll(pose, size=(width, height))
        return accuracy(tracker, ids)
    now = 0
    while True:
        tmp = deepcopy(initial)
        for i in range(len(initial)):
            l = evaluate(initial, i,  interval[i])
            r = evaluate(initial, i, -interval[i])
            maxx = max(l, r, now)
            if maxx == l:
                initial[i] += interval[i]
            if maxx == r:
                initial[i] -= interval[i]
            if maxx == now:
                pass
            now = maxx
            print(initial, now)
        if tmp == initial:
            print('done')
            break

In [10]:
def run(name):
    parsed = parseWrnch(f'./data/wrnch-{name}.json')
    res = orjson.dumps(parsed, option=orjson.OPT_INDENT_2|orjson.OPT_SERIALIZE_NUMPY)
    with open(f'./data/{name}.json', 'wb+') as f:
        f.write(res)
    
    with open(f'./data/{name}.json', 'r') as f:
        data = json.load(f)
    width, height, pose, ids, fps = data['width'], data['height'], data['pose'], data['id'], data['fps']
    tracker = Tracker(dist_function=1, decay_rate=0.95)
    tracker.trackAll(pose, size=(width, height))
    tracker.generateJS(name, data, suffix='-algo2')
    print('acc:', accuracy(tracker, ids))

In [13]:
for n in 'sidewalk BTS BLACKPINK'.split():
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        print('processing', n)
        run(n)
        print('done\n')

processing sidewalk
idenitfied 28 persons
acc: 99.29577464788733
done

processing BTS
idenitfied 20 persons
acc: 98.49981705085986
done

processing BLACKPINK
idenitfied 6 persons
acc: 99.76878612716763
done

