In [None]:
from keras.models import Model, Input
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Dropout, Activation, Average
from keras.utils import to_categorical
from keras.losses import categorical_crossentropy
from keras.callbacks import ModelCheckpoint, TensorBoard, ReduceLROnPlateau, EarlyStopping
from keras.optimizers import Adam
from keras import models
from sklearn.model_selection import train_test_split
import os
import pickle
import numpy as np
import cv2
from typing import List, Tuple, NewType

In [None]:
Matrix = NewType('Matrix', np.ndarray)
Shape = NewType('Shape', Tuple[int, int])

In [None]:
img_dir = os.path.join(os.getcwd(), '..', 'dataset')

In [None]:
classes = ['forest', 'buildings', 'river', 'mobilehomepark', 'harbor', 'golfcourse', 'agricultural', 'runway', 'baseballdiamond', 'overpass', 'chaparral', 'tenniscourt', 'intersection', 'airplane', 'parkinglot', 'sparseresidential', 'mediumresidential', 'denseresidential', 'beach', 'freeway', 'storagetanks']
classes.sort()

In [None]:
def normalize(mat: Matrix, new_min: float=0, new_max: float=1) -> Matrix:
    _min, _max = np.amin(mat), np.amax(mat)
    return np.multiply(mat - _min, (new_max - new_min) / (_max - _min)) + new_min

# Load dataset

In [None]:
imgs = []
for cls_num, cls in enumerate(classes):
    class_dir = os.path.join(img_dir, cls)
    for filename in sorted(os.listdir(class_dir)):
        filepath = os.path.join(class_dir, filename)
        img = cv2.imread(filepath, cv2.IMREAD_GRAYSCALE)
        if img.shape != (256, 256):
            img = cv2.resize(img, (256, 256))
        img = normalize(img)
        img = np.atleast_3d(img)

        imgs.append(img)

In [None]:
X = np.array(imgs)

In [None]:
X.shape

# Put together training and test tensors

In [None]:
cls = -1
classes = []
for i in range(2100):
    if i % 100 == 0:
        cls += 1
    classes.append(cls)
X_train, X_test, y_train, y_test = train_test_split(X, classes, test_size=0.2, random_state=42)
y_train = to_categorical(y_train, num_classes=21)

# Model architecture

In [None]:
input_shape = X_train[0,:,:,:].shape
model_input = Input(shape=input_shape)

In [None]:
def conv_pool_cnn(model_input):
    
    x = Conv2D(96, (3, 3), activation='relu', padding = 'same')(model_input)
    x = Conv2D(96, (3, 3), activation='relu', padding = 'same')(x)
    x = MaxPooling2D(pool_size=(3, 3), strides = 2)(x)
    x = Conv2D(192, (3, 3), activation='relu', padding = 'same')(x)
    x = Conv2D(192, (3, 3), activation='relu', padding = 'same')(x)
    x = MaxPooling2D(pool_size=(3, 3), strides = 2)(x)
    x = Conv2D(192, (3, 3), activation='relu', padding = 'same')(x)
    x = Conv2D(192, (1, 1), activation='relu', padding = 'same')(x)
    x = Conv2D(21, (1, 1))(x)
    x = GlobalAveragePooling2D()(x)
    x = Activation(activation='softmax')(x)
    
    model = Model(model_input, x, name='conv_pool_cnn')
    
    return model


In [None]:
conv_pool_cnn_model = conv_pool_cnn(model_input)

# Model training

In [None]:
def compile_and_train(model, num_epochs): 
    
    model.compile(loss=categorical_crossentropy, optimizer=Adam(lr=0.0001), metrics=['acc']) 
    filepath = 'weights/dl_bench/' + model.name + '.{epoch:02d}-{loss:.2f}.hdf5'
    checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, save_weights_only=True,
                                                 save_best_only=True, mode='auto', period=1)
    tensor_board = TensorBoard(log_dir='logs/dl_bench/', histogram_freq=0)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5,
                                  patience=5, min_lr=0.0000001,
                                  verbose=1)
    early_stopping = EarlyStopping(monitor='val_acc', min_delta=0.01, patience=30, 
                                   verbose=1, mode='auto', restore_best_weights=True)
    history = model.fit(x=X_train, y=y_train, batch_size=32, shuffle=True,
                     epochs=num_epochs, verbose=1, callbacks=[checkpoint, tensor_board, reduce_lr, early_stopping], validation_split=0.2)
    return history


In [None]:
_ = compile_and_train(conv_pool_cnn_model, num_epochs=200)

# Evaluation

In [None]:
def evaluate_acc(model):
    pred = model.predict(X_test, batch_size = 32)
    pred = np.argmax(pred, axis=1)
    acc = np.sum(np.equal(pred, y_test)) / len(y_test)
    return acc


In [None]:
pred = conv_pool_cnn_model.predict(X_test, batch_size = 32)
pred = np.argmax(pred, axis=1)
np.sum(np.not_equal(pred, y_test))

In [None]:
evaluate_acc(conv_pool_cnn_model)

In [None]:
pred = conv_pool_cnn_model.predict(X_test, batch_size = 32)


In [None]:
def evaluate_top_n_acc(model, n):
    pred = conv_pool_cnn_model.predict(X_test, batch_size = 32)
    hit = 0
    for probs, truth in zip(pred, y_test):
        top_n = probs.argsort()[-n:][::-1]
        if truth in top_n:
            hit += 1
    acc = hit / len(y_test)
    return acc

In [None]:
evaluate_top_n_acc(conv_pool_cnn_model, 5)