# hw3 Report code

In [None]:
import os
import itertools
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
% matplotlib inline

from sklearn.metrics import confusion_matrix
from keras.models import load_model
from keras.utils import np_utils

# Constants
DIRECTORY = "ml-2018spring-hw3/"
MODEL_DIRECTORY = "model/"
class_names = ["Mad", "Disgust", "Fear", "Happy", "Sad", "Surprise", "Neutral"]

## Load data and model

In [None]:
# Functions
def get_training_data(validation_split=0.0):
    filename = "train.csv"
    filepath = DIRECTORY + filename

    if os.path.exists(filepath):
        data = pd.read_csv(filepath)
        x_raw = data["feature"]
        y_raw = data["label"]
        
        
        #  Split features into array & reshape to (48, 48, 1)
        x = x_raw.str.split(expand=True).values.reshape(-1, 48, 48, 1).astype('int')
        y = y_raw.values.astype('int')
        
        # Split validation set
        if validation_split > 0.0 and validation_split <= 1.0:
            valid_size = int(validation_split*len(x))
            x_train = x[:-valid_size]
            x_valid = x[-valid_size:]
            y_train = y[:-valid_size]
            y_valid = y[-valid_size:]
        else:
            x_train = x
            y_train = y
            x_valid = []
            y_valid = []
    else:
        print("Error: No such file at %s" % filepath)

    return (x_train, y_train), (x_valid, y_valid)

In [None]:
(x_train, y_train), (x_valid, y_valid)= get_training_data(validation_split=0.1)
x_train = x_train / 255
x_valid = x_valid / 255
mean, std = np.load("dist.npy")
x_train = (x_train - mean) / std
x_valid = (x_valid - mean) / std

In [None]:
modelpath = MODEL_DIRECTORY + "model.h5"
model = load_model(modelpath)

## Confunsion matrix

In [None]:
prob = model.predict(x_valid)
y_pred = np.argmax(prob, axis=1)

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

# Compute confusion matrix
cnf_matrix = confusion_matrix(y_valid, y_pred)
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_names,
                      title='Confusion matrix, without normalization')

# Plot normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True,
                      title='Normalized confusion matrix')

plt.show()

## Plot training history

In [None]:
import pickle

with open("history/baseline.pickle", "rb") as f:
    train_history_bl = pickle.load(f)
with open("history/norm.pickle", "rb") as f:
    train_history_norm = pickle.load(f)
with open("history/aug.pickle", "rb") as f:
    train_history_aug = pickle.load(f)

In [None]:
plt.plot(train_history_bl["val_acc"])
plt.plot(train_history_norm["val_acc"])
plt.title("Train History (Normalization comparison)")
plt.ylabel("Validation Accuracy")
plt.xlabel("Epoch")
plt.legend(["w/o normalization", "w/ normalization"], loc="lower right")
plt.show()

In [None]:
print("w/o normalization acc: %s" % max(train_history_bl['val_acc']))
print("w/ normalization acc: %s" % max(train_history_norm['val_acc']))

In [None]:
plt.plot(train_history_bl["val_acc"])
plt.plot(train_history_aug["val_acc"])
plt.title("Train History (Augmentation comparison)")
plt.ylabel("Validation Accuracy")
plt.xlabel("Epoch")
plt.legend(["w/o augmentation", "w/ augmentation"], loc="lower right")
plt.show()

In [None]:
print("w/o augmentation acc: %s" % max(train_history_bl['val_acc']))
print("w/ augmentation acc: %s" % max(train_history_aug['val_acc']))

## Saliency map

In [None]:
from keras import backend as K

private_pixels = x_train
emotion_classifier = model
input_img = emotion_classifier.input
img_ids = [-698]

for idx in img_ids:
    img = private_pixels[idx].reshape(-1, 48, 48, 1)
    val_proba = emotion_classifier.predict(img)
    pred = val_proba.argmax(axis=-1)
    target = K.mean(emotion_classifier.output[:, pred])
    grads = K.gradients(target, input_img)[0]
    fn = K.function([input_img, K.learning_phase()], [grads])

    heatmap = fn([img, 0])[0].reshape(48, 48, 1)
    # Set all gradient to positive
    heatmap = np.abs(heatmap)
    # Normalize distribution
    heatmap = (heatmap - heatmap.mean()) / (heatmap.std() + 1e-5)
    # Ensure std is 0.1
    heatmap *= 0.1
    # Clip to [0, 1]
    heatmap += 0.5
    heatmap = np.clip(heatmap, 0, 1)
    heatmap = heatmap.reshape(48, 48)
    heatmap /= heatmap.max()

    thres = 0.5
    orig = private_pixels[idx].reshape(48, 48)
    see = np.copy(orig)
    see[np.where(heatmap <= thres)] = np.mean(see)

    # Original image
    plt.figure()
    plt.imshow(orig, cmap="gray")
    plt.colorbar()
    plt.tight_layout()
    fig = plt.gcf()
    plt.draw()
    # fig.savefig(os.path.join(cmap_dir, 'privateTest', '{}.png'.format(idx)), dpi=100)
    
    # Saliency map
    plt.figure()
    plt.imshow(heatmap, cmap=plt.cm.jet)
    plt.colorbar()
    plt.tight_layout()
    fig = plt.gcf()
    plt.draw()
    # fig.savefig(os.path.join(cmap_dir, 'privateTest', '{}.png'.format(idx)), dpi=100)

    # Mask original image
    plt.figure()
    plt.imshow(see, cmap="gray")
    plt.colorbar()
    plt.tight_layout()
    fig = plt.gcf()
    plt.draw()
    # fig.savefig(os.path.join(partial_see_dir, 'privateTest', '{}.png'.format(idx)), dpi=100)

## Visualizing Filters

### 利用梯度遞增法，找出最能激活特定filter的圖片(從白噪音開始)

In [None]:
def normalize(x):
    # utility function to normalize a tensor by its L2 norm
    return x / (K.sqrt(K.mean(K.square(x))) + 1e-7)

def grad_ascent(num_step,input_image_data,iter_func):
    # https://blog.keras.io/how-convolutional-neural-networks-see-the-world.html
    step_size = 1e-2
    filter_images = []
    for i in range(num_step):
        loss_value, grads_value = iter_func([input_image_data])
        input_image_data += grads_value * step_size
        if i % RECORD_FREQ == 0:
            filter_images.append((input_image_data, loss_value))
    return filter_images

# util function to convert a tensor into a valid image
def deprocess_image(x):
    # normalize tensor: center on 0., ensure std is 0.1
    x -= x.mean()
    x /= (x.std() + 1e-5)
    x *= 0.1

    # clip to [0, 1]
    x += 0.5
    x = np.clip(x, 0, 1)

    # convert to RGB array
    x *= 255
    x = x.transpose((1, 2, 0))
    x = np.clip(x, 0, 255).astype('uint8')
    return x

emotion_classifier = model
layer_dict = dict([layer.name, layer] for layer in emotion_classifier.layers)
input_img = emotion_classifier.input

name_ls = ["leaky_re_lu_5"]
collect_layers = [layer_dict[name].output for name in name_ls]
num_step = NUM_STEPS = 50
RECORD_FREQ = 10
nb_filter = 16

for cnt, c in enumerate(collect_layers):
    filter_imgs = []
    for filter_idx in range(nb_filter):
        input_img_data = np.random.random((1, 48, 48, 1)) # random noise
        target = K.mean(c[:, :, :, filter_idx])
        grads = normalize(K.gradients(target, input_img)[0])
        iterate = K.function([input_img], [target, grads])

        filter_imgs.append(grad_ascent(num_step, input_img_data, iterate))
    
    for it in range(NUM_STEPS//RECORD_FREQ):
        fig = plt.figure(figsize=(14, 8))
        for i in range(nb_filter):
            ax = fig.add_subplot(nb_filter/16, 16, i+1)
            img = filter_imgs[i][it][0].squeeze()
            ax.imshow(img, cmap='PuBu')
            plt.xticks(np.array([]))
            plt.yticks(np.array([]))
            plt.xlabel('{:.3f}'.format(filter_imgs[i][it][1]))
            plt.tight_layout()
        fig.suptitle('Filters of layer {} (# Ascent Epoch {} )'.format(name_ls[cnt], it*RECORD_FREQ))
        # img_path = os.path.join(filter_dir, '{}-{}'.format(store_path, name_ls[cnt]))
        # if not os.path.isdir(img_path):
        #    os.mkdir(img_path)
        # fig.savefig(os.path.join(img_path,'e{}'.format(it*RECORD_FREQ)))

### 給定輸入圖片，取出特定層的輸出

In [None]:
name_ls = ["conv2d_5", "leaky_re_lu_5"]
collect_layers = [K.function([input_img, K.learning_phase()], [layer_dict[name].output]) for name in name_ls]

choose_id = 17
photo = private_pixels[choose_id].reshape(-1, 48, 48, 1)
for cnt, fn in enumerate(collect_layers):
    im = fn([photo, 0]) #get the output of that layer
    fig = plt.figure(figsize=(14, 8))
    # nb_filter = im[0].shape[3]
    nb_filter = 32
    for i in range(nb_filter):
        ax = fig.add_subplot(nb_filter/16, 16, i+1)
        ax.imshow(im[0][0, :, :, i], cmap='PuBu')
        plt.xticks(np.array([]))
        plt.yticks(np.array([]))
        plt.tight_layout()
    fig.suptitle('Output of layer{} (Given image{})'.format(cnt, choose_id))
    # img_path = os.path.join(vis_dir, store_path)
    # if not os.path.isdir(img_path):
    #     os.mkdir(img_path)
    # fig.savefig(os.path.join(img_path,'layer{}'.format(cnt)))