### tf_yolov3 포멧(Mosaic Augmentation)
- tfrecord로 변환

In [None]:
path = '/home/jovyan/DATA/googlemap/torch/data/train/images/'

In [None]:
from pathlib import Path
import glob
import os
import cv2
import random

img_formats = ['bmp', 'jpg', 'jpeg', 'png', 'tif', 'tiff', 'dng']

f = []  # image files
for p in path if isinstance(path, list) else [path]:
    p = Path(p)  # os-agnostic
    if p.is_dir():  # dir
        f += glob.glob(str(p / '**' / '*.*'), recursive=True)
    elif p.is_file():  # file
        with open(p, 'r') as t:
            t = t.read().splitlines()
            parent = str(p.parent) + os.sep
            f += [x.replace('./', parent) if x.startswith('./') else x for x in t]  # local to global path
    else:
        raise Exception('%s does not exist' % p)
img_files = sorted([x.replace('/', os.sep) for x in f if x.split('.')[-1].lower() in img_formats])

In [None]:
def img2label_paths(img_paths):
    # Define label paths as a function of image paths
    sa, sb = os.sep + 'images' + os.sep, os.sep + 'labels' + os.sep  # /images/, /labels/ substrings
    label_paths = []
    for x in img_paths:
      y = list(x.replace(sa, sb, 1).rpartition('.'))
      y[-1] = y[-1].replace(x.split('.')[-1], 'txt')
      y = ''.join(y)
      label_paths.append(y)
    return label_paths

In [None]:
label_files = img2label_paths(img_files)

In [None]:
def load_image(img_files, index):
    # loads 1 image from dataset, returns img, original hw, resized hw
    path = img_files[index]
    img = cv2.imread(path)  # BGR
    assert img is not None, 'Image Not Found ' + path
    h0, w0 = img.shape[:2]  # orig hw
    return img, (h0, w0), img.shape[:2]  # img, hw_original, hw_resized

In [None]:
def exif_size(img):
    # Returns exif-corrected PIL size
    s = img.size  # (width, height)
    try:
        rotation = dict(img._getexif().items())[orientation]
        if rotation == 6:  # rotation 270
            s = (s[1], s[0])
        elif rotation == 8:  # rotation 90
            s = (s[1], s[0])
    except:
        pass

    return s

In [None]:
from PIL import Image, ExifTags
import numpy as np
from tqdm import tqdm

x = {}  # dict
pbar = tqdm(zip(img_files, label_files), desc='Scanning images', total=len(img_files))
for (img, label) in pbar:
    try:
        l = []
        im = Image.open(img)
        im.verify()  # PIL verify
        shape = exif_size(im)  # image size
        assert (shape[0] > 9) & (shape[1] > 9), 'image size <10 pixels'
        if os.path.isfile(label):
            with open(label, 'r') as f:
                l = np.array([x.split() for x in f.read().splitlines()], dtype=np.float32)  # labels
        if len(l) == 0:
            l = np.zeros((0, 5), dtype=np.float32)
        x[img] = [l, shape]
    except Exception as e:
        print('WARNING: Ignoring corrupted image and/or label %s: %s' % (img, e))
        
labels, shapes = zip(*x.values())
shapes = np.array(shapes, dtype=np.float64)
n = len(shapes)

In [None]:
def box_candidates(box1, box2, wh_thr=2, ar_thr=20, area_thr=0.1):  # box1(4,n), box2(4,n)
    # Compute candidate boxes: box1 before augment, box2 after augment, wh_thr (pixels), aspect_ratio_thr, area_ratio
    w1, h1 = box1[2] - box1[0], box1[3] - box1[1]
    w2, h2 = box2[2] - box2[0], box2[3] - box2[1]
    ar = np.maximum(w2 / (h2 + 1e-16), h2 / (w2 + 1e-16))  # aspect ratio
    return (w2 > wh_thr) & (h2 > wh_thr) & (w2 * h2 / (w1 * h1 + 1e-16) > area_thr) & (ar < ar_thr)  # candidates

In [None]:
import math

def random_perspective(img, targets=(), degrees=10, translate=.1, scale=.1, shear=10, perspective=0.0, border=(0, 0)):
    # torchvision.transforms.RandomAffine(degrees=(-10, 10), translate=(.1, .1), scale=(.9, 1.1), shear=(-10, 10))
    # targets = [cls, xyxy]

    height = img.shape[0] + border[0] * 2  # shape(h,w,c)
    width = img.shape[1] + border[1] * 2

    # Center
    C = np.eye(3)
    C[0, 2] = -img.shape[1] / 2  # x translation (pixels)
    C[1, 2] = -img.shape[0] / 2  # y translation (pixels)

    # Perspective
    P = np.eye(3)
    P[2, 0] = random.uniform(-perspective, perspective)  # x perspective (about y)
    P[2, 1] = random.uniform(-perspective, perspective)  # y perspective (about x)

    # Rotation and Scale
    R = np.eye(3)
    a = random.uniform(-degrees, degrees)
    # a += random.choice([-180, -90, 0, 90])  # add 90deg rotations to small rotations
    s = random.uniform(1 - scale, 1 + scale)
    # s = 2 ** random.uniform(-scale, scale)
    R[:2] = cv2.getRotationMatrix2D(angle=a, center=(0, 0), scale=s)

    # Shear
    S = np.eye(3)
    S[0, 1] = math.tan(random.uniform(-shear, shear) * math.pi / 180)  # x shear (deg)
    S[1, 0] = math.tan(random.uniform(-shear, shear) * math.pi / 180)  # y shear (deg)

    # Translation
    T = np.eye(3)
    T[0, 2] = random.uniform(0.5 - translate, 0.5 + translate) * width  # x translation (pixels)
    T[1, 2] = random.uniform(0.5 - translate, 0.5 + translate) * height  # y translation (pixels)

    # Combined rotation matrix
    M = T @ S @ R @ P @ C  # order of operations (right to left) is IMPORTANT
    if (border[0] != 0) or (border[1] != 0) or (M != np.eye(3)).any():  # image changed
        if perspective:
            img = cv2.warpPerspective(img, M, dsize=(width, height), borderValue=(114, 114, 114))
        else:  # affine
            img = cv2.warpAffine(img, M[:2], dsize=(width, height), borderValue=(114, 114, 114))

    # Transform label coordinates
    n = len(targets)
    if n:
        # warp points
        xy = np.ones((n * 4, 3))
        xy[:, :2] = targets[:, [1, 2, 3, 4, 1, 4, 3, 2]].reshape(n * 4, 2)  # x1y1, x2y2, x1y2, x2y1
        xy = xy @ M.T  # transform
        if perspective:
            xy = (xy[:, :2] / xy[:, 2:3]).reshape(n, 8)  # rescale
        else:  # affine
            xy = xy[:, :2].reshape(n, 8)

        # create new boxes
        x = xy[:, [0, 2, 4, 6]]
        y = xy[:, [1, 3, 5, 7]]
        xy = np.concatenate((x.min(1), y.min(1), x.max(1), y.max(1))).reshape(4, n).T

        # clip boxes
        xy[:, [0, 2]] = xy[:, [0, 2]].clip(0, width)
        xy[:, [1, 3]] = xy[:, [1, 3]].clip(0, height)

        # filter candidates
        i = box_candidates(box1=targets[:, 1:5].T * s, box2=xy.T)
        targets = targets[i]
        targets[:, 1:5] = xy[i]

    return img, targets

In [None]:
def load_mosaic(img_files, labels, index):
    # loads images in a mosaic

    labels4 = []
    s = 800 # image size
    mosaic_border = [-s // 2, -s // 2]
    yc, xc = [int(random.uniform(-x, 2 * s + x)) for x in mosaic_border]  # mosaic center x, y
    indices = [index] + [random.randint(0, len(labels) - 1) for _ in range(3)]  # 3 additional image indices
    for i, index in enumerate(indices):
        # Load image
        img, _, (h, w) = load_image(img_files, index)

        # place img in img4
        if i == 0:  # top left
            img4 = np.full((s * 2, s * 2, img.shape[2]), 114, dtype=np.uint8)  # base image with 4 tiles
            x1a, y1a, x2a, y2a = max(xc - w, 0), max(yc - h, 0), xc, yc  # xmin, ymin, xmax, ymax (large image)
            x1b, y1b, x2b, y2b = w - (x2a - x1a), h - (y2a - y1a), w, h  # xmin, ymin, xmax, ymax (small image)
        elif i == 1:  # top right
            x1a, y1a, x2a, y2a = xc, max(yc - h, 0), min(xc + w, s * 2), yc
            x1b, y1b, x2b, y2b = 0, h - (y2a - y1a), min(w, x2a - x1a), h
        elif i == 2:  # bottom left
            x1a, y1a, x2a, y2a = max(xc - w, 0), yc, xc, min(s * 2, yc + h)
            x1b, y1b, x2b, y2b = w - (x2a - x1a), 0, w, min(y2a - y1a, h)
        elif i == 3:  # bottom right
            x1a, y1a, x2a, y2a = xc, yc, min(xc + w, s * 2), min(s * 2, yc + h)
            x1b, y1b, x2b, y2b = 0, 0, min(w, x2a - x1a), min(y2a - y1a, h)

        img4[y1a:y2a, x1a:x2a] = img[y1b:y2b, x1b:x2b]  # img4[ymin:ymax, xmin:xmax]
        padw = x1a - x1b
        padh = y1a - y1b
        
        # Labels
        x = labels[index]
        labels_x = x.copy()
        if x.size > 0:  # Normalized xywh to pixel xyxy format
            labels_x[:, 1] = w * (x[:, 1] - x[:, 3] / 2) + padw
            labels_x[:, 2] = h * (x[:, 2] - x[:, 4] / 2) + padh
            labels_x[:, 3] = w * (x[:, 1] + x[:, 3] / 2) + padw
            labels_x[:, 4] = h * (x[:, 2] + x[:, 4] / 2) + padh
        labels4.append(labels_x)

    # Concat/clip labels
    if len(labels4):
        labels4 = np.concatenate(labels4, 0)
        np.clip(labels4[:, 1:], 0, 2 * s, out=labels4[:, 1:])  # use with random_perspective
        # img4, labels4 = replicate(img4, labels4)  # replicate
        
    # Augment
    img4, labels4 = random_perspective(img4, labels4,
                                       degrees=0.0,
                                       translate=0.1,
                                       scale=0.5,
                                       shear=0.0,
                                       perspective=0.0,
                                       border=mosaic_border)  # border to remove

    return img4, labels4

In [None]:
label_names = {0 : '13',
               1 : '14',
               2 : '15',
               3 : '16',
               4 : '17',}

In [None]:
def _set_tf_features_by_mosaic(img4, labels4, img_shape, img_path):
    
    succees, encoded_image = cv2.imencode('.jpg', img4)
    img_raw = encoded_image.tobytes()
    xmin = []
    ymin = []
    xmax = []
    ymax = []
    classes_text = []
    height,width = img_shape
    
    for label,x1,y1,x2,y2 in labels4:
        xmin.append(float(x1) / width)
        ymin.append(float(y1) / height)
        xmax.append(float(x2) / width)
        ymax.append(float(y2) / height)
        
        classes_text.append(label_names[int(label)].encode('utf8'))
        

    features={
        'image/encoded': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])),
        'image/filename': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_path[0].encode('utf8')])),
        'image/object/bbox/xmin': tf.train.Feature(float_list=tf.train.FloatList(value=xmin)),
        'image/object/bbox/xmax': tf.train.Feature(float_list=tf.train.FloatList(value=xmax)),
        'image/object/bbox/ymin': tf.train.Feature(float_list=tf.train.FloatList(value=ymin)),
        'image/object/bbox/ymax': tf.train.Feature(float_list=tf.train.FloatList(value=ymax)),
        'image/object/class/text': tf.train.Feature(bytes_list=tf.train.BytesList(value=classes_text)),
        }
    
    example = tf.train.Example(features=tf.train.Features(feature=features))
    
    return example

In [None]:
import tensorflow as tf

writer = tf.io.TFRecordWriter('./train_mosaic_5x.tfrecord')

for i in tqdm(range(10)):
    for index, img_path in enumerate(img_files):
        img4, labels4 = load_mosaic(img_files, labels, index)
        tfrecord_example = _set_tf_features_by_mosaic(img4, labels4, img4.shape[:2], img_path)
        writer.write(tfrecord_example.SerializeToString())
    
writer.close()