In [None]:
import numpy as np
from itertools import chain
import random
from os.path import join

from keras.layers import Conv2D
from keras.models import Model
from keras import backend as K

from data import DatasetReader
from data import PATH_ABBYY_DATASET, PATH_BARCODES_DATASET, PATH_DUBSKA_MATRIX_DATASET
from data import PATH_DUBSKA_QRCODE_DATASET, PATH_SOEROES_DATASET
from models import DarknetModel

from utils.io import read_image
from utils.paths import PATH_PROJECT
from utils.image import resize_image

In [None]:
class Args:
    def __init__(self):
        self.train = True


args = Args()

In [None]:
path_artefacts = PATH_PROJECT
path_weights = join(path_artefacts, 'weights_detector')

In [None]:
BATCH_SIZE = 1
IMAGE_SHAPE = (3, None, None)
STEPS_PER_EPOCH = 1541 // 4
EPOCHS = 100

In [None]:
K.set_image_data_format('channels_first')
feature_extractor = DarknetModel(IMAGE_SHAPE, use_dropout=True, filter_counts=[60, 100, 140])
DOWNSAMPLE_FACTOR = feature_extractor.get_downsample_factor()
print(f'Downsample factor: {DOWNSAMPLE_FACTOR}')

In [None]:
def create_ground_truth_image(shape, markup):
    image = np.zeros(shape)
    for corner in markup:
        points = corner['pts'] // DOWNSAMPLE_FACTOR
        for point in points:
            if shape[1] <= point[0] or shape[2] <= point[1]:
                raise ValueError('Point exceeds image bounds')
            image[0, point[0], point[1]] = 1
    return image


def generate_data(randomize=True, shrink_size=256):
    datasets = [DatasetReader(path) for path in [PATH_SOEROES_DATASET,
                                                 PATH_DUBSKA_QRCODE_DATASET,
                                                 PATH_DUBSKA_MATRIX_DATASET,
                                                 PATH_BARCODES_DATASET,
                                                 PATH_ABBYY_DATASET]]
    paths = [[(path, idx) for path in dataset.get_paths()]
             for idx, dataset in enumerate(datasets)]
    paths = list(chain(*paths))
    while True:
        indices = list(range(len(paths)))
        if randomize:
            random.shuffle(indices)
        for path_idx in indices:
            path, dataset_idx = paths[path_idx]
            dataset = datasets[dataset_idx]
            image_id = dataset.get_image_id(path)
            markup = dataset.get_markup()[image_id]
            image = read_image(path)
            shrink_factor = max(image.shape[1] // shrink_size, image.shape[2] // shrink_size)
            if shrink_factor > 1:
                image = resize_image(image, fx=shrink_factor, fy=shrink_factor)
            try:
                ground_truth = create_ground_truth_image((1, *(image.shape[1:]) // DOWNSAMPLE_FACTOR), markup)
            except ValueError:
                continue
            yield np.array([image]), np.array([ground_truth])

In [None]:
# from utils.visualization import show_image

# gen = generate_data()
# image, gt = next(gen)
# print(image.shape)
# print(gt.shape)

# show_image(image)
# show_image(gt)

In [None]:
x = feature_extractor.output
x = Conv2D(1, (3, 3), padding='same', activation='sigmoid')(x)
detector = Model(inputs=[feature_extractor.input], outputs=[x])
detector.summary()

In [None]:
if args.train:
    from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
    
    if args.train:
        callbacks = [
            ReduceLROnPlateau(monitor='loss', factor=0.5, patience=5, verbose=1),
            ModelCheckpoint(path_weights, monitor='loss', period=5, save_best_only=True),
            EarlyStopping(monitor='loss', patience=25, verbose=1)
        ]
        detector.compile(loss='binary_crossentropy', optimizer='adam')
        history = detector.fit_generator(generate_data(), steps_per_epoch=STEPS_PER_EPOCH,
                                         epochs=EPOCHS, callbacks=callbacks)
        detector.save_weights(path_weights)
detector.load_weights(path_weights)