### Test

1. 两点间距离公式：

$$|AB|=\sqrt{(x_1-x_2)^2 + (y_1-y_2)^2}$$

In [None]:
import os
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["CUDA_VISIBLE_DEVICES"]="15"

import tensorflow as tf
from tensorflow.keras.preprocessing.image import img_to_array, load_img, ImageDataGenerator, array_to_img
from tensorflow.python.saved_model import tag_constants
from tensorflow.python.compiler.tensorrt import trt_convert as trt
from tensorflow.keras.applications import MobileNetV2 


import glob
import numpy as np
import cv2
import matplotlib.pyplot as plt
import random
import time 
import datetime

from tqdm.notebook import tqdm

from bokeh.io import output_notebook, show, push_notebook
from bokeh.plotting import figure
from bokeh.layouts import row
from bokeh.core.validation import silence
from bokeh.core.validation.warnings import MISSING_RENDERERS
silence(MISSING_RENDERERS, True)

import sklearn.metrics as metrics
import matplotlib.ticker as ticker 
import matplotlib.pyplot as plt

In [None]:
#
# Resize the dataset to native resolutions of the network and overwrite the original 
# dataset 
#
def resize_dataset_and_overwrite(img_size, root, dataset):
    
    img_cols = img_size[0]
    img_rows = img_size[1]
    
    img_files = glob.glob(root+'/'+dataset+'/*/*.jpg')

    # Convert all images for Mobilenet format (224, 224)
    for fn in tqdm(img_files):

        image = cv2.imread(fn)
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        image = cv2.resize(image, (img_cols, img_rows), interpolation=cv2.INTER_CUBIC)
        
        cv2.imwrite(fn, cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

#
# Example path '/data/aiqs-datasets/rim-dataset/training'
#
def augment_dataset_with_distortions(root, dataset):
    
    print("Augmenting dataset with distorted variants [",root+'/'+dataset,"]")

    # Augment training
    img_files = glob.glob(root+'/'+dataset+'/*/*.jpg')

    # Define ranges of the image distortion to include in the generator
    datagen = ImageDataGenerator(
            rotation_range=20,
            width_shift_range=0.1,
            height_shift_range=0.1,
            shear_range=0.1,
            zoom_range=0.1,
            horizontal_flip=False,
            fill_mode='nearest')

    for fn in tqdm(img_files):
        filename = fn.split('/')[-1]
        img = load_img(fn)  
        x = img_to_array(img)  
        x = x.reshape((1,) + x.shape)  

        # the .flow() command below generates batches of randomly transformed images
        # and saves the results to the same directory as the original image
        destdir = fn[:-len(filename)]

        n_aug = 3
        i = 0
        for batch in datagen.flow(x, batch_size=1, 
                                  save_to_dir=destdir, 
                                  save_prefix='aug_', 
                                  save_format='jpg'):
            i += 1
            if i > n_aug:
                break 

#
# Read dataset and labels 
#
def read_dataset_and_labels(root, dataset, target_size):
    
    print("Reading dataset from ["+root+'/'+dataset+"] as ", dataset)
    print(" - Image shape to be used (w,h,c) = ", target_size)
    
    dataset_files = glob.glob(root+'/'+dataset+'/*/*.jpg')
    num_classes = len(glob.glob(root+'/'+dataset+'/*'))
    
    print(" - Number of classes", num_classes)
    print(" - Number of images", len(dataset_files))
    
    w,h,c = target_size # width, height, channels 
    
    n_images = len(dataset_files)
    imgs = np.ndarray((n_images, w, h, c))
    labels = np.ndarray((n_images))

    idx = 0
    for fn in tqdm(dataset_files):
        
        # class
        cat = fn.split('/')[-2]

        img = load_img(fn, grayscale=False, target_size=(w, h))
        img = img_to_array(img)
        
        imgs[idx] = img/255 # rescale to [0,1] range
        labels[idx] = int(cat)

        idx = idx + 1
        
    # Convert images to single precision and convert categorical labels to 
    # vectors: 0 to [1, 0, 0, ...], 1 to [0, 1, 0, ...] etc.
        
    # Convert images to float
    imgs = imgs.astype('float32')
    
    # Convert labels to categorical
    labels = tf.keras.utils.to_categorical(labels, num_classes)
        
    return imgs, labels

#
# Monitoring of the training process with plots updated every epoch showing 
# history of the optimization proccess i.e. loss and accuracy for both training and testing 
# datasets. 
# 
class TrainingPlot(tf.keras.callbacks.Callback):

    # This function is called when the training begins
    def on_train_begin(self, logs={}):
        
        self.losses = []
        self.acc = []
        self.val_losses = []
        self.val_acc = []
        self.logs = []
        
        output_notebook()
        self.p1 = figure(plot_width=450, plot_height=300, title='Losses')
        self.p2 = figure(plot_width=450, plot_height=300, title='Accuracy')

        self.target = show(row(self.p1, self.p2), notebook_handle=True)
                
    # This function is called at the end of each epoch
    def on_epoch_end(self, epoch, logs={}):

        # Append the logs, losses and accuracies to the lists
        self.logs.append(logs)
        self.losses.append(logs.get('loss'))
        self.acc.append(logs.get('accuracy'))
        self.val_losses.append(logs.get('val_loss'))
        self.val_acc.append(logs.get('val_accuracy'))

        # Before plotting ensure at least 2 epochs have passed
        if len(self.losses) > 0:

            N = np.arange(0, len(self.losses))
            
            self.p1.line(N, self.losses, color='blue', legend_label='Training')
            self.p1.line(N, self.val_losses, color='red', legend_label='Testing')
            
            self.p2.line(N, self.acc, color='blue', legend_label='Training')
            self.p2.line(N, self.val_acc, color='red', legend_label='Testing')
            
            self.p1.legend.location = "top_left"
            self.p2.legend.location = "top_left"

            push_notebook(handle=self.target)

# 
# Evaluate model's accuracy
#
def evaluate_model(model, test_img, y_test):
    
    score = model.evaluate(test_img, y_test, verbose=0)
    
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])
    
#
# Plot random sample of image for spot checking 
#
def plot_random_sample(n, x, y):
    
    n_elements = len(x)
    
    n_col = 5 
    n_row = n // n_col 
    if n % n_col > 0:
        n_row += 1
    
    f = plt.figure(figsize=(12,6))
    
    subplot_idx=1
    for i in range(0,n):
        
        idx = random.randint(0,n_elements-1)
        label_str = str(np.argmax(y[idx]))
        f.add_subplot(n_row, n_col, subplot_idx, title=label_str)
        plt.imshow(x[idx])
        subplot_idx += 1
    
    plt.tight_layout()
    plt.show() 
    
#
# Function to equalize the histogram of colored image
#
def equalize_colored_histogram(img):
    
    # Change colorspace to YUV
    img_yuv = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
    
    # equalize the histogram of the Y channel
    img_yuv[:,:,0] = cv2.equalizeHist(img_yuv[:,:,0])

    # convert the YUV image back to RGB format
    img_output = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2BGR)
    
    return img_output

#
# Change of the gamma value of the image 
#
def adjust_gamma(image, gamma=1.0):
    # build a lookup table mapping the pixel values [0, 255] to
    # their adjusted gamma values
    invGamma = 1.0 / gamma
    table = np.array([((i / 255.0) ** invGamma) * 255
        for i in np.arange(0, 256)]).astype("uint8")

    # apply gamma correction using the lookup table
    return cv2.LUT(image, table)

#
# Generate uniformly gamma augmented set
# 
def dataset_augment_with_gamma_variations(path):
    
    dirs = glob.glob(path+'/*')
    
    for dir_name in sorted(dirs):
        
        print('augmenting gamma -- working in - ',dir_name.split('/')[-1])
    
        fns = glob.glob(dir_name+'/*')

        # Loop over files
        for fn in tqdm(fns):

            # read image
            img = cv2.imread(fn)
            img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)

            # equalize histogram
            img_eq = equalize_colored_histogram(img)

            # Loop over gamma changes
            for new_gamma in [0.25, 0.5, 0.75, 1.0, 1.125, 1.25, 1.5]:

                img_gamma = adjust_gamma(img_eq, gamma=new_gamma)

                new_fn = dir_name + '/' + fn.split('/')[-1][:-4] + '-gamma-' + str(new_gamma) + '.jpg'
                cv2.imwrite(new_fn, cv2.cvtColor(img_gamma, cv2.COLOR_BGR2RGB))

In [None]:
# Configuration
batch_size = 64
epochs = 100
num_classes = 17

dataset_root = '/data/datasets/2020-02-20-rim-dataset--training'
resize_and_overwrite_original_files = False
augment_with_distortions = False
augment_with_gamma = True
target_size = (224,224,3)

In [None]:
# MobileNet-V2
from tensorflow.keras.applications import MobileNetV2 
model = MobileNetV2(input_shape=target_size, 
                    weights=None, 
                    classes=num_classes)

solver = tf.keras.optimizers.Adam(learning_rate=0.0001)
model.compile(loss='binary_crossentropy',
              optimizer=solver,
              metrics=['accuracy'])

In [None]:
if resize_and_overwrite_original_files:
    print('-- Resizing and overwriting -- ')
    resize_dataset_and_overwrite(target_size, dataset_root, 'training')
    resize_dataset_and_overwrite(target_size, dataset_root, 'testing')
    
if augment_with_gamma:
    print('-- Augmenting with gamma changes --')
    dataset_augment_with_gamma_variations(dataset_root+'/training')
    #dataset_augment_with_gamma_variations(dataset_root+'/testing')
    
if augment_with_distortions:
    print('-- Augmenting with distortions --')
    augment_dataset_with_distortions(dataset_root, 'training')
    #augment_dataset_with_distortions(dataset_root, 'testing')

In [None]:
print('-- Reading datasets --')
train_img, y_train = read_dataset_and_labels(dataset_root, 'training', target_size)
test_img, y_test = read_dataset_and_labels(dataset_root, 'testing', target_size)

## Spot-check dataset

In [None]:
n = 10
print('-- Training set --')
plot_random_sample(n, train_img, y_train)

print('-- Testing set --')
plot_random_sample(n, test_img, y_test)

## Train

In [None]:
log_dir="logs/rim/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

print('-- Training -- ')
plot_losses = TrainingPlot()
history = model.fit(train_img, 
                    y_train,
                    batch_size=batch_size,
                    epochs=epochs,
                    verbose=1,
                    validation_data=(test_img, y_test), 
                    shuffle=True, 
                    callbacks=[plot_losses, tensorboard_callback])

In [None]:
print('-- Model evaluation --')
evaluate_model(model, test_img, y_test)

In [None]:
# Save model
filename = 'mobilenetv2_model'
model.save(filename)

In [None]:
result = model.predict(test_img)

In [None]:
# Test accuracy for each class separately
files_to_plot = 5
size = 224
test_classes_dir = '/data/datasets/2020-02-20-rim-dataset--training/testing/*'

classes_dirs = sorted(glob.glob(test_classes_dir))

print('List of directories to test:')
for i in classes_dirs:
    print(i)

all_results = {}

for dir_name in classes_dirs:
    
    print('')
    print('Testing in : ', dir_name)
    print('')
    test_files = glob.glob(dir_name+'/*.jpg')
    n_test_images = len(test_files)
    test_img = np.ndarray((n_test_images, size, size, 3))
    test_labels = np.ndarray((n_test_images))

    idx = 0
    for fn in test_files:
        cat = fn.split('/')[-2]

        img = load_img(fn, grayscale=False, target_size=(size, size))
        img = img_to_array(img)
        test_img[idx] = img/255 # rescale to [0,1] range
        test_labels[idx] = int(cat)

        idx = idx + 1

    

    test_img = test_img.astype('float32')
    print('test_img shape:', test_img.shape)

    y_test_binary = tf.keras.utils.to_categorical(test_labels, num_classes)

    # Run model
    result = model.predict(test_img)
    
    correct = 0
    total = 0
    wrong_files = []

    for i in range(0,len(result)):
        predicted_id = np.argmax(result[i])
        predicted_id_prob = result[i][predicted_id]
        ground_truth_id = np.argmax(y_test_binary[i])

        # Check if the predicted label matches ground truth
        status = predicted_id == ground_truth_id
        
        # Keep names of files where we made mistakes
        if status == 0:
            wrong_files.append(test_files[i])

        correct += status
        total += 1

        #fn = test_files[i].split('/')[-2:]
        
    print('Correct samples :', correct)
    print('Total samples :', total)
    print('Accuracy : ', (correct/total).round(2))
    print('Files with errors:', wrong_files)
    
    print('Sample images:')
    plot_random_sample(files_to_plot, test_img, y_test_binary)
    
    print('Wrong images:')
    for fn in wrong_files:
        print(fn)
        img = load_img(fn, grayscale=False, target_size=(size, size))
        plt.imshow(img)
        plt.show()
        
    
    class_index = dir_name.split('/')[-1]
    
    all_results[class_index] = (correct/total).round(2)
    
# Print and plot results of all tests
print(all_results)    

names = list(all_results.keys())
values = list(all_results.values())

plt.bar(range(len(all_results)),values,tick_label=names)
plt.show()