In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, GlobalAveragePooling2D, Dense, MaxPool2D, Softmax
from tensorflow.keras.models import Model
import os
from glob import glob
import math
import tensorflow.keras.backend as K
import xmltodict
from PIL import Image, ImageDraw

print(tf.__version__)
print(tf.config.list_physical_devices('gpu'))


ANCHOR_SIZES = [32, 64, 128]
ANCHOR_RATIOS = [[1,1], [math.sqrt(2), 1/math.sqrt(2)], [math.sqrt(3), 1/math.sqrt(3)]]
SUBSAMPLE_RATIO = 4

class ResUnit(Model):
    def __init__(self, filter_in, filter_out, kernel_size):
        super(ResUnit, self).__init__()

        self.sequence = list()

        # Resnet 특유의 덧셈 연산을 위해 입력의 차원(depth, channel)을 맞춰주는 연산
        self.identity = Conv2D(filter_out, (1, 1), padding='valid')

        # Bottleneck(down sampling)
        self.sequence.append(Conv2D(filter_in, (1, 1), padding='valid'))

        # Conv
        self.sequence.append(Conv2D(filter_in, kernel_size, padding='same'))

        # Bottleneck(up sampling)
        self.sequence.append(Conv2D(filter_out, (1, 1), padding='valid'))

        # BN, Activation
        self.sequence.append(BatchNormalization())
        self.sequence.append(Activation('relu'))

    def __call__(self, images, training):
        # Downsampling -> Conv -> Upsampling -> BN -> Activation -> Add
        h = images
        for unit in self.sequence:
            if isinstance(unit, BatchNormalization):
                h = unit(h, training=training)
            else:
                h = unit(h)

        # Add
        return self.identity(images) + h

class ResLayer(Model):
    def __init__(self, filter_in, filter_out, kernel_size, iter_count):
        super(ResLayer, self).__init__()

        self.sequence = list()

        # ResUnit 을 iter_count 개수만큼 쌓아올림
        # https://eremo2002.tistory.com/76 을 참고하여 ResNet의 논문 커널의 수를 맞춤
        for i in range(iter_count):
            self.sequence.append(ResUnit(filter_in, filter_out, kernel_size))

    def __call__(self, images, training):
        for layer in self.sequence:
            images = layer(images, training)
        return images

class RPN(Model):
    def __init__(self, image_size, kernel_size, anchor_size, anchor_ratio, subsampling_ratio):
        super(RPN, self).__init__()

        # ResNet으로 돌릴때는 600 이상 1000 이하로 하는것이 좋음
        self.image_size = image_size

        # RPN(Region Proposal Network) 학습 시 사용할 Sliding window kernel 의 크기
        self.kernel_size = kernel_size

        # 앵커 박스 크기
        # 실제 오브젝트의 크기가 얼마나 될지 모르기때문에 임의의 크기의 앵커 박스를 두는 것
        self.anchor_sizes = anchor_size

        # 앵커 박스 비율
        self.anchor_ratios = anchor_ratio

        # 앵커 박스 개수
        self.anchor_num = len(anchor_size) * len(anchor_ratio)

        # 이미지 축소 계수
        self.subsampling_ratio = subsampling_ratio

        self.conv = Conv2D(256, (3, 3), (1, 1), padding='same')

        # 바운딩 박스 내에 오브젝트가 있는지 없는지 여부
        # 18 = 9(앵커박스 개수) * 2(있다, 없다 각 확률)
        # HxWx18 좌표 (h, w)의 각 앵커 박스에 오브젝트가 있을 확률
        self.obj_conv = Conv2D(2 * self.anchor_num, (1, 1), padding='valid')

        # 바운딩 박스를 regression 하는 conv
        # 36 = 9(앵커박스 개수) * 4(바운딩박스 x, y, width, height)
        self.bb_conv = Conv2D(4 * self.anchor_num, (1, 1), padding='valid')
        self.avgpool = GlobalAveragePooling2D()

        self.softmax = Softmax(name='obj_output')
        # self.softmax =
        self.bbox_fc = Dense(4)

    def __call__(self, images):
        images = self.conv(images)
        obj = self.obj_conv(images)
        bb = self.bb_conv(images)
        # obj = self.avgpool(obj)
        shape = obj.shape
        obj = tf.reshape(obj, (-1, shape[1], shape[2], self.anchor_num, 2))
        shape = bb.shape
        bb = tf.reshape(bb, (-1, shape[1], shape[2], self.anchor_num, 4))
        # bb = self.avgpool(bb)
        obj = self.softmax(obj)
        bb = self.bbox_fc(bb)
        bb = tf.map_fn(self.generate_anchors, bb, name='bb_output')


        # result = tf.concat([obj, bb], -1)
        return [obj, bb]

    '''
    이 단계에서 9개의 anchor box를 이용하여 classification과 bbox regression을 먼저 구한다. (For 학습)
    먼저, CNN에서 뽑아낸 feature map에 대해 3x3 conv filter 256개를 연산하여 depth를 256으로 만든다.
    그 후 1x1 conv 두개를 이용하여 각각 classification과 bbox regression을 계산한다.
    '''
    def generate_anchors(self, feature_map):
        print('feature_map = {}'.format(feature_map))
        anchor_boxes = list()
        subsampled_image_width = int(self.image_size[0] / self.subsampling_ratio)
        subsampled_image_height = int(self.image_size[1] / self.subsampling_ratio)
        anchor_sizes = np.array(self.anchor_sizes).reshape([-1, 1])
        anchor_ratio = np.array(self.anchor_ratios).reshape([1, -1])
        anchor_samples = np.matmul(anchor_sizes, anchor_ratio).reshape([-1, 2])

        for x in range(subsampled_image_width):
            l1 = list()
            for y in range(subsampled_image_height):
                l2 = list()
                for anchor_sample in anchor_samples:
                    width, height = anchor_sample[0], anchor_sample[1]
                    l2.append((x * subsampled_image_width, y * subsampled_image_height, width, height))
                l1.append(l2)
            anchor_boxes.append(l1)
        anchor_boxes = tf.convert_to_tensor(anchor_boxes)
        print('anchor_boxes = {}'.format(anchor_boxes))

        x = tf.maximum((feature_map[:,:,:,0] - anchor_boxes[:,:,:,0]) / anchor_boxes[:,:,:,2], 0)
        y = tf.maximum((feature_map[:,:,:,1] - anchor_boxes[:,:,:,1]) / anchor_boxes[:,:,:,3], 0)
        width = tf.maximum(tf.math.log(feature_map[:,:,:,2] / anchor_boxes[:,:,:,2] + 1e-4), 0)
        height = tf.maximum(tf.math.log(feature_map[:,:,:,3] / anchor_boxes[:,:,:,3] + 1e-4), 0)
        new_feature_map = tf.stack([x, y, width, height], -1)
        # return new_feature_map
        return new_feature_map

class ResNet(Model):
    def __init__(self, *args, **kwargs):
        super(ResNet, self).__init__(args, kwargs)

        # ResNet 모델
        self.conv1 = Conv2D(64, (7, 7), (2, 2), padding='same')
        self.maxpool1 = MaxPool2D((3, 3), (2, 2), padding='same')
        self.res1 = ResLayer(64, 256, (3, 3), 3)
        self.res2 = ResLayer(128, 512, (3, 3), 4)
        # self.res3 = ResLayer(256, 1024, (3, 3), 6)
        # self.res4 = ResLayer(512, 2048, (3, 3), 3)  # channel = 2048
        # self.maxpool2 = MaxPool2D((4, 4), (4, 4))  # channel = 128

    def __call__(self, images, training):
        print(images.shape)
        images = self.conv1(images)
        print(images.shape)
        images = self.maxpool1(images)
        print(images.shape)
        images = self.res1(images, training)
        print(images.shape)
        images = self.res2(images, training)
        print(images.shape)
        # images = self.res3(images, training)
        # images = self.res4(images, training)
        # images = self.maxpool2(images)
        # images = self.rpn(images)
        return images

2.3.0
[]


In [2]:
def cal_iou(anchor, ground_truth):
    anchor_area = anchor[:,:,:,2] * anchor[:,:,:,3]
    ground_truth_area = ground_truth[:,2] * ground_truth[:,3]
    left = tf.maximum(anchor[:,:,:,0], ground_truth[:,0])
    top = tf.maximum(anchor[:,:,:,1], ground_truth[:,1])
    right = tf.minimum(anchor[:,:,:,0] + anchor[:,:,:,2], ground_truth[:,0] + ground_truth[:,2])
    bottom = tf.minimum(anchor[:,:,:,1] + anchor[:,:,:,3], ground_truth[:,1] + ground_truth[:,3])
    width = tf.maximum(right - left, 0)
    height = tf.maximum(bottom - top, 0)

    intersection_area = width * height
    # iou = tf.concat([1 - (intersection_area / (anchor_area + ground_truth_area - intersection_area)),
    #                   intersection_area / (anchor_area + ground_truth_area - intersection_area)], -1)
    # iou = tf.reshape(iou, [-1, iou.shape[1], iou.shape[2], anchor.shape[3], 2])
    # return iou
    return intersection_area / (anchor_area + ground_truth_area - intersection_area)

In [3]:
# dataset 준비
train_dir = 'train/VOCdevkit/VOC2007'
test_dir = 'test/VOCdevkit/VOC2007'


def get_dataset(dir, image_size):
    annotation_files = glob("{}/Annotations/0011*".format(dir))
    image_files = glob("{}/JPEGImages/0011*".format(dir))

    images, labels = list(), list()

    count = 0
    for annotation_file in annotation_files:
        if count % 100 == 0:
            print("{} / {}".format(count, len(annotation_files)))
        count += 1
        file = open(annotation_file, mode='r')
        file_data = file.read()
        annotation = xmltodict.parse(file_data)['annotation']
        filename = annotation['filename']
        width = float(annotation['size']['width'])
        height = float(annotation['size']['height'])
        channel = float(annotation['size']['depth'])
        im = Image.open("{}/JPEGImages/{}".format(dir, filename)).resize(image_size)
        image = np.array(im)
        image = (image / 255.0).astype(np.float32)

        def append(obj):
            name = obj['name']
            bndbox = obj['bndbox']
            x = int(bndbox['xmin'])
            y = int(bndbox['ymin'])
            bnd_width = int(bndbox['xmax']) - x
            bnd_height = int(bndbox['ymax']) - y

            # 224x224 사이즈로 정규화
            x = np.float32((float(x) / width) * image_size[0])
            y = np.float32((float(y) / height) * image_size[1])
            bnd_width = np.float32((float(bnd_width) / width) * image_size[0])
            bnd_height = np.float32((float(bnd_height) / height) * image_size[1])
            images.append(image)
            l = list()
            for _ in range(256):
                l.append(np.array([0.0, 1.0, x, y, bnd_width, bnd_height]))
            labels.append(l)

        object = annotation['object']
        if isinstance(object, list):
            for obj in object:
                append(obj)
        else:
            obj = object
            append(obj)

    return np.array(images), np.array(labels)


IMAGE_SIZE = (224, 224)
train_x, train_y = get_dataset(train_dir, IMAGE_SIZE)
test_x, test_y = get_dataset(test_dir, IMAGE_SIZE)

0 / 51
0 / 49


In [4]:
BATCH_SIZE = 16
train_ds = tf.data.Dataset.from_tensor_slices((train_x, train_y)).shuffle(1000).batch(BATCH_SIZE)
test_ds = tf.data.Dataset.from_tensor_slices((test_x, test_y)).batch(BATCH_SIZE)

In [5]:
def loss_function(y_true, y_pred):
    print('y_pred = {}'.format(y_pred))
    print('y_true = {}'.format(y_true))

    classification_pred, box_regression_pred = tf.split(y_pred, [2, 4], -1)
    classification_true, box_regression_true = tf.split(y_true, [2, 4], -1)

    def get_iou(inputs):
        anchor = inputs[0]
        ground_truth = inputs[1]
        anchor_area = anchor[:,:,:,2] * anchor[:,:,:,3]

        def gt_map(gt):
            print('gt = {}'.format(gt))
            return gt

        new_gt = tf.map_fn(gt_map, ground_truth)
        ground_truth_area = ground_truth[:,2] * ground_truth[:,3]
        left = tf.maximum(anchor[:,:,:,0], ground_truth[:,0])
        top = tf.maximum(anchor[:,:,:,1], ground_truth[:,1])
        right = tf.minimum(anchor[:,:,:,0] + anchor[:,:,:,2], ground_truth[:,0] + ground_truth[:,2])
        bottom = tf.minimum(anchor[:,:,:,1] + anchor[:,:,:,3], ground_truth[:,1] + ground_truth[:,3])
        width = tf.maximum(right - left, 0)
        height = tf.maximum(bottom - top, 0)

        intersection_area = width * height
        return [1 - (intersection_area / (anchor_area + ground_truth_area - intersection_area)),
                          intersection_area / (anchor_area + ground_truth_area - intersection_area)]

    iou, invert_iou = tf.map_fn(get_iou, [box_regression_pred, box_regression_true])

    def generate_anchor_box(inputs):
        anchor_boxes = list()
        subsampled_image_width = int(IMAGE_SIZE[0] / SUBSAMPLE_RATIO)
        subsampled_image_height = int(IMAGE_SIZE[1] / SUBSAMPLE_RATIO)
        anchor_sizes = np.array(ANCHOR_SIZES).reshape([-1, 1])
        anchor_ratio = np.array(ANCHOR_RATIOS).reshape([1, -1])
        anchor_samples = np.matmul(anchor_sizes, anchor_ratio).reshape([-1, 2])

        for y in range(subsampled_image_width):
            l1 = list()
            for x in range(subsampled_image_height):
                l2 = list()
                for anchor_sample in anchor_samples:
                    width, height = anchor_sample[0], anchor_sample[1]
                    l2.append((x * SUBSAMPLE_RATIO, y * SUBSAMPLE_RATIO, width, height))
                l1.append(l2)
            anchor_boxes.append(l1)
        anchor_boxes = np.array(anchor_boxes)

        print('inputs = {}'.format(inputs))

        x = tf.maximum((inputs[:,:,:,0] - anchor_boxes[:,:,:,0]) / anchor_boxes[:,:,:,2], 0)
        y = tf.maximum((inputs[:,:,:,1] - anchor_boxes[:,:,:,1]) / anchor_boxes[:,:,:,3], 0)
        width = tf.maximum(tf.math.log(inputs[:,:,:,2] / anchor_boxes[:,:,:,2] + 1e-4), 0)
        height = tf.maximum(tf.math.log(inputs[:,:,:,3] / anchor_boxes[:,:,:,3] + 1e-4), 0)
        new_feature_map = tf.stack([x, y, width, height], -1)
        return new_feature_map

    box_regression_true = tf.map_fn(generate_anchor_box, box_regression_true)
    box_regression_pred = tf.map_fn(generate_anchor_box, box_regression_pred)

    def map(x):
        x = tf.reshape(x, [-1, 6])
        x = tf.sort(x, direction='DESCENDING', axis=-1)
        x = tf.slice(x, [0, 0], [256, -1])
        return x

    # box_loss = smooth L1 loss
    def cal_box_loss(x, y):
        gamma = 10
        HUBER_DELTA = 1
        box_loss = K.abs(x - y)
        box_loss = K.switch(box_loss < HUBER_DELTA, 0.5 * (box_loss ** 2), box_loss - 0.5)
        box_loss = K.sum(box_loss, axis=-1)
        return gamma * box_loss

    def get_iou(anchor):
        ground_truth = anchor[1]
        anchor = anchor[0]
        anchor_area = anchor[:,2] * anchor[:,3]
        ground_truth_area = ground_truth[:,2] * ground_truth[:,3]
        left = tf.maximum(anchor[:,0], ground_truth[:,0])
        top = tf.maximum(anchor[:,1], ground_truth[:,1])
        right = tf.minimum(anchor[:,0] + anchor[:,2], ground_truth[:,0] + ground_truth[:,2])
        bottom = tf.minimum(anchor[:,1] + anchor[:,3], ground_truth[:,1] + ground_truth[:,3])
        width = tf.maximum(right - left, 0)
        height = tf.maximum(bottom - top, 0)

        intersection_area = width * height
        return [1 - (intersection_area / (anchor_area + ground_truth_area - intersection_area)),
                          intersection_area / (anchor_area + ground_truth_area - intersection_area)]

    iou, invert_iou = tf.map_fn(get_iou, [box_regression_pred, box_regression_true])
    iou = tf.concat([iou, invert_iou], -1)
    iou = tf.reshape(iou, [-1, invert_iou.shape[1], 2])
    box_loss = cal_box_loss(box_regression_pred, box_regression_true)
    cross_entropy = K.categorical_crossentropy(classification_pred, iou)
    result = cross_entropy + box_loss
    return result

In [6]:
model = tf.keras.models.Sequential()
model.add(ResNet())
model.add(RPN((224, 224), 3, ANCHOR_SIZES, ANCHOR_RATIOS, SUBSAMPLE_RATIO))
model.compile(optimizer='adam', loss={'bb_output': loss_function, 'obj_output': loss_function}, metrics=['accuracy'])
model.fit(train_ds, batch_size=BATCH_SIZE)
predictions = model.predict(test_ds, batch_size=BATCH_SIZE)
print('predictions = {}'.format(predictions))

AttributeError: 'str' object has no attribute 'op'

In [None]:
a = tf.constant(np.array([[[[9,1,7,10], [3,4,5,6]]], [[[6, 5,4,8], [1, 2, 3, 4]]]]).astype(np.int32))
a = tf.constant(np.array([[1], [4]]).astype(np.int32))
b = tf.constant(10 - a)
print(a, b)
c = tf.concat([a, b], -1)
print(c)