In [None]:
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.losses import *
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.callbacks import *
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from imgaug import augmenters as iaa
from glob import glob as gl
import cv2
import random
from PIL import Image, ImageDraw, ImageFont
from sklearn.preprocessing import OneHotEncoder
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix

# Create model

In [None]:
resnet_backbone = ResNet50(weights="imagenet", include_top=False, pooling=None, input_tensor=Input(shape=(224, 224, 3)))
backbone_output = resnet_backbone.output

# head
head = AveragePooling2D(pool_size=(7, 7))(backbone_output)
head = Flatten(name="flatten")(head)
head = Dense(512, activation="relu")(head)
head = Dropout(0.5)(head)
head = Dense(512, activation="relu")(head)
head = Dropout(0.25)(head)
head = Dense(256, activation="relu")(head)
head = Dropout(0.25)(head)
head = Dense(64, activation="relu")(head)
head = Dropout(0.25)(head)
head = Dense(1, activation="sigmoid")(head)

# head
# conv1 = Conv2D(1024, 1, activation='relu', strides=1)(backbone_output)
# batch_norm1 = Dropout(0.5)(conv1)

# conv2 = Conv2D(512, 1, activation='relu', strides=1)(batch_norm1)
# batch_norm2 = Dropout(0.25)(conv2)

# conv3 = Conv2D(256, 1, activation='relu', strides=1)(batch_norm2)
# batch_norm3 = Dropout(0.25)(conv3)

# output = Conv2D(2, 1, activation='relu', strides=1)(batch_norm3)
# output = GlobalAveragePooling2D()(output)
# output = Activation('softmax')(output)

# conv4 = Conv2D(256, 3, activation='relu')(backbone_output)
# batch_norm3 = BatchNormalization()(conv4)

for layer in resnet_backbone.layers:
    layer.trainable = False

In [None]:
log_dir = "logs/"
tensor_board = keras.callbacks.TensorBoard(log_dir=log_dir)
loss = BinaryCrossentropy(from_logits=True)
# reduce_lr = ReduceLROnPlateau(monitor=test_loss.name, factor=0.1, patience=3, verbose=1)
# early_stopping = EarlyStopping(monitor=test_loss.name, min_delta=0, patience=10, verbose=1)
checkpoint = ModelCheckpoint(log_dir + 'ep{epoch:03d}-loss{loss:.3f}-val_loss{val_loss:.3f}.h5',
        monitor='val_loss', save_weights_only=True, save_best_only=True, period=1)

In [None]:
model = Model(inputs=resnet_backbone.input, outputs=head)
model.compile(optimizer="adam", loss=loss, metrics=['accuracy'])

# Data generator

In [None]:
def cut_out(image):
    img_height, img_width, _ = image.shape
    
    cutout_size = np.random.randint(20, 50)
    cutout_arr = np.full((cutout_size, cutout_size, 3), 0)
    
    x = np.random.randint(0, img_width - cutout_size + 1)
    y = np.random.randint(0, img_height - cutout_size + 1)
    
    image[y:y+cutout_size, x:cutout_size+x, :] = cutout_arr
    return image

In [None]:
def normal_blur(img):
    prop = np.random.randint(0, 100)/100
    if prop > 1-0.5/2:
        img = cv2.blur(img,(5,5))
    if prop < 0.5/2:
        img = cv2.blur(img,(3,3))   
    return img

def motion_blur(img):
    size = np.random.randint(1, 3)
    size = 2*size+1
    
    # generating the kernel
    kernel_motion_blur = np.zeros((size, size))
    kernel_motion_blur[int((size-1)/2), :] = np.ones(size)
    kernel_motion_blur = kernel_motion_blur / size
    
    # applying the kernel to the input image
    img = cv2.filter2D(img, -1, kernel_motion_blur)
    
    return img

BLUR_METHOD = [normal_blur, motion_blur]

In [None]:
fonts = gl('fonts/*')

with open('eng_dict.txt', 'r', encoding="utf8") as file:
    words = [i.strip() for i in file.readlines()]
    
def generate_text(min_word, max_word):
    text = ''
    for i in range(random.randint(min_word, max_word)):
        if len(text)< 10:
            text += ' ' + words[random.randint(0, 25480)]
        else:
            text += '{}'.format(random.choice([' ', ' ', '  -  ', ' ', ' ', ' ', ' ', ' '])) + words[random.randint(0, 25480)]
    if len(text) != 0:
        text += '.'
    return text

def add_text(image):
    img_w, img_h, _ = image.shape

    image = Image.fromarray(image)
    d = ImageDraw.Draw(image)
    
    start_coord = [random.randint(50, (img_w // 4)*3), random.randint(50, (img_h // 2)+100)]
    ''' Get random font '''
    font_size = random.randint(15, 60)
    text_color = random.choice(['red', 'white', 'green', 'yellow', 'white'])
    font = ImageFont.truetype(random.choice(fonts), font_size)

    ''' Random words '''
    text = generate_text(min_word=5, max_word=18)  # @TODO change this if change font size

    ''' Draw text on image '''
    d.text(start_coord, text, font=font, fill=text_color)
    return np.array(image)

In [None]:
def get_data(img_name):
    img = cv2.imread(img_name)
    
    label = img_name.split('\\')[0].replace('data/extracted_frame/', '').split('/')[0]
    if label == 'other':
        label = 0
    else:
        label = 1
        
    # blurring
    if np.random.randint(0, 100)/100 > 1-0.5/2: 
        img = random.choice(BLUR_METHOD)(img)
        
    # flipping
    if np.random.randint(0, 100)/100 > 1-0.5/2: 
        img = cv2.flip(img, random.choice([0, 1]))
        
    # add text
    if np.random.randint(0, 100)/100 > 1-0.5/2: 
        img = add_text(img)

    img = cv2.resize(img, (224, 224))
    
    # cut out
    if np.random.randint(0, 100)/100 > 1-0.5/2: 
        img = cut_out(img)
        
    return img, label

In [None]:
def data_generator(images, batch_size):
    '''data generator for fit_generator'''
    
    n = len(images)
    i = 0
    
    while True:
        image_data = []
        labels = []
        for b in range(batch_size):
            if i == 0:
                np.random.shuffle(images)
            image, label = get_data(images[i])

            image_data.append(np.array(image))
            labels.append(label)
            
            i = (i + 1) % n
        image_data = np.array(image_data)
        labels = np.array(labels)
        
#         process = []
#         for i in range(len(labels)):
#             label = labels[i]
#             if label == 1:
#                 process.append(np.array([0, 1]))
#             else:
#                 process.append(np.array([1, 0]))
#         process = np.array(process)
        yield image_data, labels

def data_generator_wrapper(images, batch_size):
    if batch_size <= 0: return None
    return data_generator(images, batch_size)

# Train

In [None]:
EXTRACTED_FRAME_PATH = [gl('data/extracted_frame/other/*'), gl('data/extracted_frame/wood-making/*')]

numb_train1 = int(len(EXTRACTED_FRAME_PATH[0])*0.8)
numb_train2 = int(len(EXTRACTED_FRAME_PATH[1])*0.8)

train_data = EXTRACTED_FRAME_PATH[0][0:numb_train1] + EXTRACTED_FRAME_PATH[1][0:numb_train2] 
val_data = EXTRACTED_FRAME_PATH[0][numb_train1:] + EXTRACTED_FRAME_PATH[1][numb_train2:] 

# data = EXTRACTED_FRAME_PATH[0] + EXTRACTED_FRAME_PATH[1]
# np.random.shuffle(data)

# fold_data = [data[i:i+675] for i in range(10)]

In [None]:
# for i in range(2,6):
    
#     image, label = get_data(data[-i])
#     cv2.imwrite('test-{}-{}.png'.format(i, label), image)

## Freezing backbone

In [None]:
batch_size = 64
total_epoch = 0

In [None]:
model.fit(data_generator_wrapper(train_data, batch_size),
            steps_per_epoch=max(1, len(train_data) // batch_size),
            validation_data=data_generator_wrapper(val_data, batch_size),
            validation_steps=max(1, len(val_data) // batch_size),
            epochs=50,
            callbacks=[tensor_board, checkpoint])

In [None]:
# for i in range(10):
#     print('\nFold ', i+1)

#     temp = fold_data.copy()
#     val_data = temp[i]
#     temp.pop(i)
#     train_data = []
#     for fold in temp:
#         train_data += fold

#     model.fit(data_generator_wrapper(train_data, batch_size),
#             steps_per_epoch=max(1, len(train_data) // batch_size),
#             validation_data=data_generator_wrapper(val_data, batch_size),
#             validation_steps=max(1, len(val_data) // batch_size),
#             epochs=12+total_epoch,
#             initial_epoch=total_epoch,
#             callbacks=[tensor_board, checkpoint])
#     total_epoch += 12

# #         model.evaluate(data_generator_wrapper(val_data, batch_size))


## Unfreeze model

In [None]:
# batch_size = 32

In [None]:
# for layer in resnet_backbone.layers:
#     layer.trainable = True
    
# try:
#     for i in range(10):
#         print('\nFold ', i+1)
        
#         temp = fold_data.copy()
#         val_data = temp[i]
#         temp.pop(i)
#         train_data = []
#         for fold in temp:
#             train_data += fold
        
#         model.fit(data_generator_wrapper(train_data, batch_size),
#             steps_per_epoch=max(1, len(train_data) // batch_size),
#             validation_data=data_generator_wrapper(val_data, batch_size),
#             validation_steps=max(1, len(val_data) // batch_size),
#             epochs=10+total_epoch,
#             initial_epoch=total_epoch,
#               callbacks=[tensor_board, checkpoint, CustomCallback()])
#         total_epoch += 10
        
# #         model.evaluate(data_generator_wrapper(val_data, batch_size))
        
# except Exception as e:
#     if model.crashed:
#         print('\nModel crashed')
#     else:
#         print(e)

# Plot confusion matrix

In [None]:
from keras.preprocessing.image import ImageDataGenerator

data_gen = ImageDataGenerator()
"""
Data directory structure
img_dir/
        class1/
        class2/
        ....
"""
img_dir = 'data/extracted_frame/'
validation_generator = data_gen.flow_from_directory(img_dir,
                                                    target_size=(224, 224),
                                                    batch_size=64,
                                                    class_mode='binary',
                                                    shuffle=False )

Y_pred = model.predict(validation_generator, 5260  // 64)
super_threshold_indices = Y_pred > 0.5
Y_pred[super_threshold_indices] = 1

super_threshold_indices = Y_pred < 0.5
Y_pred[super_threshold_indices] = 0
# # y_pred = np.argmax(Y_pred, axis=1) # for categorical data

print('Confusion Matrix')
print(confusion_matrix(validation_generator.classes, Y_pred))
print('Classification Report')
target_names = ['Other', 'Wood-making']
print(classification_report(validation_generator.classes, y_pred, target_names=target_names))