# DATA20001 Deep Learning - Group Project
## Image project

**Due Thursday, January 6, before 23:59.**


## 0 Group Information

Group name: deepbois <br>
Group members: Niclas Joswig, Jonas De Paolis

## 1 Classification Pipeline

### 1.1 Pre-Processing

In [None]:
import os, inspect, cv2
import numpy as np
import pandas as pd
import matplotlib as mlp
import matplotlib.pyplot as plt
%matplotlib inline

# images
from keras.preprocessing.image import ImageDataGenerator
from PIL import Image

In [None]:
# directories
current_dir = os.path.dirname(os.path.abspath(inspect.stack()[0][1]))
image_dir = os.path.join(current_dir, 'train/images')
label_dir = os.path.join(current_dir, 'train/annotations')
aug_dir = os.path.join(current_dir, 'train_augmented/images/')
pred_dir = os.path.join(current_dir, 'test/images/')

Handle images.

In [None]:
def RGB2GS(img):
    """Converts RGB image of shape (128,128,3) to grayscale image of shape (128,128,1)."""
    
    if(len(img.shape) == 3):
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = np.expand_dims(img, axis=2)
    return img

def load_data(data_dir): 
    """Load data from directory."""
    
    data = []
    for file in os.listdir(data_dir):
        path = os.path.join(data_dir, file)
        img = cv2.imread(path) # contains rgb encoded images
        img = RGB2GS(img)
        img_num = file.split(".")[0][2:] 
        data.append([img, int(img_num)]) 
    return data

data = load_data(image_dir) 
img = np.array([i[0] for i in data])
img_number = np.array([int(i[1]) for i in data])

Handle labels.

In [None]:
def load_annotation():
    """Load annotations from directory."""
    
    annotation = []
    for file in sorted(os.listdir(label_dir)):
        text_file = open(os.path.join(label_dir, file), "r")
        lines = text_file.read().split('\n')
        annotation.append(np.array(lines))
        text_file.close()
    annotation = np.array(annotation)
    return annotation

def build_multilabel_arr(img_number, annotation):
    """Build multilabel array of shape (n_points, n_classes)."""
    
    n_points = len(img_number)
    n_classes = len(annotation)
    arr = np.zeros((n_points, n_classes))
    for i in range(n_points):        
        for j in range(n_classes):
            if np.any(str(img_number[i]) == annotation[j]):
                arr[i,j] = 1
    return arr

annotation = load_annotation()
lab = build_multilabel_arr(img_number, annotation)

Define class weights. Note: we ended up not using these.

In [None]:
def get_class_weights(annotation):
    """Compute class weights based on number of occurences."""
    
    occurences = [] # count occurences
    weights = [] # compute weights
    
    for i in range(len(annotation)):
        occurences.append(len(annotation[i]))
    for i in range(len(annotation)):
        weights.append(max(occurences)/occurences[i])
        
    indices = list(range(14)) # build dictionary
    class_weights = dict(zip(indices, weights))
    return class_weights

class_weights = get_class_weights(annotation)

# largest class: 9 (reference), smallest class: 0
for i in class_weights:
    print(f"class {i}: {class_weights[i]}")
    

Use data augmentation.

In [None]:
N_COPIES = 2 # number of new copies per original image

# instantiate image generator
aug_generator = ImageDataGenerator(
    rotation_range=10, 
    width_shift_range=0.1, 
    height_shift_range=0.1, 
    horizontal_flip=True, 
    fill_mode='reflect')
    
def create_augdata(generator, save_files=False):
    """Create augmented data including images and labels."""
    
    if not os.path.exists(aug_dir) and save_files:
        os.makedirs(aug_dir)
    
    n_images = len(img)
    aug_images = []
    
    for i in range(n_images):
        image = np.expand_dims(img[i], axis=0)
        image_number = img_number[i]
        
        # get new images from generator
        loader = generator.flow(image)
        image_versions = [next(loader)[0].reshape(128,128).astype(np.uint8) for i in range(N_COPIES)]
        image_version_numbers = [f"{image_number}_{version}" for version in range(N_COPIES)]
        aug_images.append(image_versions)
        
        # save files to directory
        if save_files:
            for file, filename in zip(image_versions, image_version_numbers):
                file = Image.fromarray(file)
                file.save(aug_dir+filename+".jpg")
    
    aug_images = np.array(aug_images).reshape(n_images*N_COPIES,128,128,1)
    aug_labels = np.repeat(lab, N_COPIES, axis=0)
    return aug_images, aug_labels
    
img_augmented, lab_augmented = create_augdata(aug_generator)

Concatenate original with augmented data and shuffle both X and y in unison.

In [None]:
# concatenate original data and augmented data
X_data = np.vstack((img, img_augmented))
y_data = np.vstack((lab, lab_augmented))

def shuffle_data(X_data, y_data):
    """Shuffle both X and y in unison."""
    
    assert len(X_data) == len(y_data)
    perm = np.random.permutation(len(X_data))
    return X_data[perm], y_data[perm]

X_data, y_data = shuffle_data(X_data, y_data)

### use smaller sample
DATA_POINTS = 100
X_data = X_data[:DATA_POINTS]
y_data = y_data[:DATA_POINTS]

print(X_data.shape)
print(y_data.shape)

### 1.2 Automated Model Optimisation 

... using random search. 
Note: we ran this particular code on macos, with other systems we sometimes experienced complications.

In [None]:
# Keras
import keras
from keras.models import Sequential
from keras.layers import Dense, Flatten, Dropout, BatchNormalization
from keras.layers.convolutional import Conv2D, MaxPooling2D
from keras import backend as K

# RandomizedSearchCV
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import RandomizedSearchCV
from itertools import product

Implement F1 score metric. <br>
Note: this code is copied from https://stackoverflow.com/a/45305384.

In [None]:
def f1(y_true, y_pred):
    
    def recall(y_true, y_pred):
        """Recall metric.
        
        Only computes a batch-wise average of recall.
        Computes the recall, a metric for multi-label classification of
        how many relevant items are selected.
        """
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = true_positives / (possible_positives + K.epsilon())
        return recall

    def precision(y_true, y_pred):
        """Precision metric.

        Only computes a batch-wise average of precision.
        Computes the precision, a metric for multi-label classification of
        how many selected items are relevant.
        """
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision
    
    precision = precision(y_true, y_pred)
    recall = recall(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))


Settings for random search.

In [None]:
MAX_CONV = 4 # max number of convolutional layers
MAX_DENSE = 4 # max number of dense layers
N_FOLDS = 5 # number of folds used for cross validation (>=2)
N_ITER = 30 # number of tested hyperparameter combinations (N_ITER * CV = number of runs)
N_CORES = 1 # note: do NOT use -1 for all cores
VERBOSE_PROGRESS = 1 # ...
VERBOSE_CONFIG = 2 # ...

Define architecture template. Fixed elements are output activation function (sigmoid) and
loss function (binary crossentropy). Since we will optimise most of the hidden layer hyperparameters later on through `RandomizedSearchCV`, those concrete values are replaced by variables.

In [None]:
INPUT_SHAPE = (128, 128, 1) # image size is 128x128 with one channel

def create_model(
    conv_n, conv_activation, kern_n, kern_size, kern_stride,
    dense_n, dense_activation, dense_init, unit_n, dropout,
    epochs, batch_size, optimizer):
    """Build CNN architecture."""
    
    model = Sequential()

    # add convolutional layers
    for i in range(conv_n):
        
        if i==0: model.add(Conv2D(activation=conv_activation, filters=kern_n[i], kernel_size=kern_size[i], strides=kern_stride[i], padding='same', input_shape=INPUT_SHAPE))
        else: model.add(Conv2D(activation=conv_activation, filters=kern_n[i], kernel_size=kern_size[i], strides=kern_stride[i], padding='same'))
        model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=(2,2)))
    
    # flatten for transition to dense layers
    model.add(Flatten())
    
    # add dense layers
    for i in range(dense_n):
        model.add(Dense(activation=dense_activation, kernel_initializer=dense_init, units=unit_n[i]))
        model.add(Dropout(rate=dropout[i]))
    
    # output layer is fixed: 14 classes, sigmoid activation
    model.add(Dense(activation='sigmoid', units=14)) 
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['acc', f1]) # f1 metric as implemented above
    return model


Define random search optimisation function.

In [None]:
def combine_params(params, n):
    """Create combinations of parameters for n layers."""
    
    params = tuple(params)
    combinations = product(params, repeat=n)                           
    return [c for c in combinations]

def optimise_model(model, params, data):
    """Perform random search and return optimal parameters."""
    
    random_search = RandomizedSearchCV(estimator=model, param_distributions=params, refit=True, n_iter=N_ITER, cv=N_FOLDS, n_jobs=N_CORES, verbose=VERBOSE_CONFIG) 
    random_search = random_search.fit(X=data[0], y=data[1]) # class_weight=class_weights
    
    best_score = random_search.best_score_
    best_params = random_search.best_params_
    best_model = random_search.best_estimator_
    
    results = random_search.cv_results_
    return best_score, best_params, best_model, results


Define hyperparameter space that is to be explored.

In [None]:
# dictionary of hyperparameter options
parameters = {
    
    # conv layers
    'conv_n'            : np.arange(1, MAX_CONV+1),
    'conv_activation'   : ['relu', 'tanh'],
    'kern_n'            : combine_params([32, 64, 128], n=MAX_CONV),
    'kern_size'         : combine_params([3, 4, 5], n=MAX_CONV),
    'kern_stride'       : combine_params([1], n=MAX_CONV),
    
    # dense layers
    'dense_n'           : np.arange(1, MAX_DENSE+1),
    'dense_activation'  : ['relu', 'tanh'],
    'dense_init'        : ['truncated_normal', 'glorot_uniform'],
    'unit_n'            : combine_params([32, 128, 512], n=MAX_DENSE),
    'dropout'           : combine_params([0.3, 0.4, 0.5], n=MAX_DENSE),
    
    # other
    'epochs'            : [10, 20], 
    'batch_size'        : [16, 128, 256],
    'optimizer'         : ['adam', 'rmsprop']
    
}


Run random search.

In [None]:
# instantiate model and run random search
cnn_prototype = KerasClassifier(build_fn=create_model, verbose=VERBOSE_PROGRESS)
best_score, best_params, best_model, results = optimise_model(model=cnn_prototype, params=parameters, data=[X_data, y_data])


Display the top 3 architectures found during random search.

In [None]:
# display N best architectures
N = 3

# create and sort df by test score rank
results = pd.DataFrame(results)
results = results.sort_values('rank_test_score')
results = results.reset_index(drop=True)

# print scores and architectures
for n in range(N):
    
    rank_n_best = results.loc[n, 'rank_test_score']
    print(f"\n> Test score rank: {rank_n_best}")
    
    score_n_best = results.loc[n, 'mean_test_score']
    print(f"> Test score: {score_n_best}\n")
    
    params_n_best = results.loc[n, 'params']
    model_n_best = best_model.set_params(**params_n_best)
    print(best_model.model.summary())
    

### 1.3 Manual Model Optimisation

Based on the collected metadata, optimise the best of the aforementioned architectures by hand.

In [None]:
# Keras
import keras
from keras.models import Sequential
from keras.layers import Dense, Flatten, Dropout, BatchNormalization
from keras.layers.convolutional import Conv2D, MaxPooling2D

# sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

# generate train-test-split
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.2)

Below given code represents the final and best architecture we could come up with.

In [None]:
def create_model():
    """Build CNN architecture."""
    
    # conv2d layers ...
    model = Sequential()
    model.add(Conv2D(32, (3,3), border_mode='same', activation='relu', kernel_initializer='truncated_normal', input_shape=(128,128,1)))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2,2)))
    
    model.add(Conv2D(64, (3,3), border_mode='same', activation='relu', kernel_initializer='truncated_normal'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2,2)))

    model.add(Conv2D(128, (3,3), border_mode='same', activation='relu', kernel_initializer='truncated_normal'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2,2)))
    
    model.add(Conv2D(256, (3,3), border_mode='same', activation='relu', kernel_initializer='truncated_normal'))
    model.add(BatchNormalization())
    #model.add(MaxPooling2D(pool_size=(2,2)))

    model.add(Flatten())
    
    # dense layers ...
    model.add(Dense(256, activation='relu', kernel_initializer="truncated_normal"))
    model.add(Dropout(0.5))

    model.add(Dense(14, activation='sigmoid', kernel_initializer="truncated_normal"))
    model.compile(loss='binary_crossentropy', metrics=['accuracy', f1], optimizer='adam')
    return model

cnn_final = create_model()

Train model, generate plot and save model.

In [None]:
# train model
history = cnn_final.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=20, batch_size=32, shuffle=True, verbose=2)

# plot history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss'); plt.ylabel('loss'); plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

# plot history for F1
plt.plot(history.history['f1'])
plt.plot(history.history['val_f1'])
plt.title('model F1'); plt.ylabel('F1'); plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

# versioning
import datetime
time = datetime.datetime.now()
timestamp = time.strftime('%y%m%d_%H%M%S')

# save model
cnn_final.save(f'./_results/model_cnn_{timestamp}.h5')

Load previous model.

In [None]:
# load model
# cnn_final = load_model('./_results/_.h5') 

Find best threshold for each class. For details, check project report.

In [None]:
# get predictions
preds = cnn_final.predict(X_test)

# ...
threshold_options = [.1, .15, .2, .3, .4, .5, .6, .7, .8, .9]
best_thresholds = []

# for each target variable ...
for target_var in range(14):
    
    # compute score for each threshold
    scores = []
    for threshold in threshold_options:
        
        # first, use threshold option for observed class
        preds_ = preds.copy()
        preds_target = preds_[:, target_var]
        preds_target[preds_target > threshold] = 1
        preds_target[preds_target <= threshold] = 0
        preds_[:, target_var] = preds_target
        
        # then, use default threshold (0.2, overall best for all classes) for all other classes
        preds_[preds_ > 0.2] = 1
        preds_[preds_ <= 0.2] = 0
        
        # compute micro F1 score
        score = f1_score(y_test.astype(int), preds_, average='micro')  
        scores.append(score)
    
    # find best threshold
    best_score = max(scores)
    for i in range(len(scores)):
        if scores[i] == best_score:
            threshold = threshold_options[i]
            
    # append to list
    best_thresholds.append(threshold)
            

These are the best performing thresholds per class.

In [None]:
BEST_THRESHOLDS = [.1, .9, .6, .4, .1, .3, .4, .3, .4, .3, .3, .9, .3, .4]

Use final model, best thresholds and validation data to evaluate the model.

In [None]:
# get predictions
preds = cnn_final.predict(X_test)

# for each target variable ...
for target_var in range(14):

    preds_target = preds[:, target_var]
    preds_target[preds_target > BEST_THRESHOLDS[i]] = 1
    preds_target[preds_target <= BEST_THRESHOLDS[i]] = 0
    preds[:, target_var] = preds_target

score = f1_score(y_test, preds, average='micro')
print(score)

## Predictions

Load image data that is to be predicted.

In [None]:
data = load_data(pred_dir) 
X_pred = np.array([i[0] for i in data])
print(X_pred.shape) 

Make predictions.

In [None]:
# get predictions
preds = cnn_final.predict(X_pred)

# for each target variable ...
for target_var in range(14):

    preds_target = preds[:, target_var]
    preds_target[preds_target > BEST_THRESHOLDS[i]] = 1
    preds_target[preds_target <= BEST_THRESHOLDS[i]] = 0
    preds[:, target_var] = preds_target

Save prediction output matrix to .txt file.

In [None]:
np.savetxt('./_results/results.txt', preds, fmt='%d')