In [None]:
import os
import glob
import pickle
import shap

import pandas as pd
import numpy as np
import category_encoders as ce


from numpy import argmax
from PIL import Image, ImageDraw, ImageFilter

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score


import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
#tensorflow2系の場合、以下はエラー・・・
#from tensorflow.Keras.callbacks import *
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.applications.vgg16 import VGG16

import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('ggplot')
font = {'family' : 'meiryo'}
plt.rc('font', **font)

In [None]:
def expand2square(pil_img, background_color):
    width, height = pil_img.size
    if width == height:
        return pil_img
    elif width > height:
        result = Image.new(pil_img.mode, (width, width), background_color)
        result.paste(pil_img, (0, (width - height) // 2))
        return result
    else:
        result = Image.new(pil_img.mode, (height, height), background_color)
        result.paste(pil_img, ((height - width) // 2, 0))
        return result

def mask_circle_solid(pil_img, background_color, blur_radius):
    background = Image.new(pil_img.mode, pil_img.size, background_color)
    mask = Image.new("L", pil_img.size, 0)
    draw = ImageDraw.Draw(mask)
    draw.ellipse((blur_radius, blur_radius, pil_img.size[0] - blur_radius, pil_img.size[1] - blur_radius), fill=255)
    
    return Image.composite(pil_img, background, mask)

In [None]:
image_size=128
X = []
Y = []

for variety_id in [dir_name[-2:] for dir_name in glob.glob("./images/*")]:
    for img_file_path in glob.glob("./images/" + variety_id + "/*"):
        image = Image.open(img_file_path)
        image = image.convert("RGB")
        
        #縦横比を保ったまま正方形にする
        #image = expand2square(image, (0, 0, 0)).resize((image_size, image_size), Image.LANCZOS)
        
        image = image.resize((image_size, image_size))
        
        #円で切り取る
        image = mask_circle_solid(image, (0, 0, 0), -5)
        
        X.append(np.array(image))
        Y.append(variety_id)
del image

In [None]:
for i in range(3):
    plt.imshow(X[i])
    plt.show()

In [None]:
Y_variety = [int(i) - 1 for i in Y]
output_num = len(set(Y_variety))

In [None]:
def create_model():
    model = Sequential()
    model.add(Conv2D(16, kernel_size = 3, activation="relu", input_shape=(image_size,image_size,3)))
    model.add(Conv2D(16, kernel_size = 3, activation="relu"))
    #model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(32, kernel_size = 3, activation="relu"))
    model.add(Conv2D(32, kernel_size = 3, activation="relu"))
    #model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(64, kernel_size = 3, activation="relu"))
    model.add(Conv2D(64, kernel_size = 3, activation="relu"))
    #model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Flatten())
    model.add(Dense(256, activation="relu"))
    #model.add(BatchNormalization())
    model.add(Dropout(0.2))
    model.add(Dense(output_num, activation='softmax'))
    return model

In [None]:
def create_model_vgg():
    #base_model=VGG16(weights='imagenet',include_top=False,
    #             input_tensor=Input(shape=(image_size,image_size,3)))
    base_model=VGG16(weights='imagenet',include_top=False,
                 input_shape=(image_size,image_size,3))
    x = base_model.output
    x = BatchNormalization()(x)
    x = Flatten()(x)
    x = Dense(256,activation='relu')(x)
    x = Dropout(0.4)(x)
    prediction = Dense(output_num, activation='softmax')(x)

    for layer in base_model.layers[:15]:
        layer.trainable=False
    
    return Model(inputs=base_model.input,outputs=prediction)

In [None]:
def model_train(X, Y):
    
    X = np.array(X).astype("float32") / 255
    
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
    for fold, (train_index, test_index) in enumerate(skf.split(X,Y)):
        
        print('#'*25)
        print('### FOLD %i'%(fold+1))
        print('#'*25)
        
        ce_ohe = ce.OneHotEncoder(handle_unknown='impute')
        Y = np.array(ce_ohe.fit_transform(Y))
        
        X_train = X[train_index]
        X_valid = X[test_index]
        Y_train = Y[train_index]
        Y_valid = Y[test_index]
    
        datagen = ImageDataGenerator(
                featurewise_center=False,  # set input mean to 0 over the dataset
                samplewise_center=False,  # set each sample mean to 0
                featurewise_std_normalization=False,  # divide inputs by std of the dataset
                samplewise_std_normalization=False,  # divide each input by its std
                zca_whitening=False,  # apply ZCA whitening
                rotation_range=20,  # randomly rotate images in the range (degrees, 0 to 180)
                zoom_range = 0.05, # Randomly zoom image 
                width_shift_range=0.05,  # randomly shift images horizontally (fraction of total width)
                height_shift_range=0.05,  # randomly shift images vertically (fraction of total height)
                horizontal_flip=True,  # randomly flip images
                vertical_flip=True)  # randomly flip images

        datagen.fit(X_train)

        #sess = InteractiveSession()

        model = create_model_vgg()
        model.compile(loss='categorical_crossentropy', optimizer="rmsprop", metrics=['accuracy'])
        model.summary()

        #callbacks = [ReduceLROnPlateau(monitor='loss', patience=4, verbose=1, factor=0.6),
        #         EarlyStopping(monitor='loss', patience=10)]

        #hist = model.fit(X_train, y_train, epochs=10, batch_size=64,verbose=1,callbacks=callbacks)
        learning_rate_reduction = ReduceLROnPlateau(monitor='val_accuracy', 
                                                patience=4, 
                                                verbose=1, 
                                                factor=0.6, 
                                                min_lr=0.00001)
        
        hist = model.fit_generator(datagen.flow(X_train,Y_train, batch_size=64),
                                  epochs = 50, validation_data = (X_valid,Y_valid),
                                  verbose = 2, steps_per_epoch=X_train.shape[0] // 64
                                  , callbacks=[learning_rate_reduction])

        model.save("model/model_" + str(fold) + ".h5")
        #hist.save("model/hist_" + str(fold) + ".h5")

        pred = model.predict(X_valid)
        Y_pred_classes = argmax(pred, axis=1)
        Y_true = argmax(Y_valid, axis=1)
        ACC_pred = accuracy_score(Y_pred_classes, Y_true)

        fig = plt.figure(figsize=(12, 16))
        fig.subplots_adjust(wspace=0.5)
        fig.suptitle("hist", fontsize=20)

        ax1 = fig.add_subplot(211,
                              title="Loss:",
                              ylabel="Loss",
                              xlabel="Epoch")
        ax1.plot(hist.history["loss"])
        ax1.legend(["Train", "Test"], loc="upper left")

        ax2 = fig.add_subplot(212,
                              title="val Acc:",
                              ylabel="val Acc",
                              xlabel="Epoch")
        ax2.plot(hist.history['val_accuracy'])
        ax2.legend(["Train", "Test"], loc="upper left")

        plt.show()


        #with tf.Session() as sess:
        #     sess.run(tf.global_variables_initializer())
        
        Y_pred_classes = np.argmax(pred,axis = 1) 
        
        errors = (Y_pred_classes - Y_true != 0)
        Y_pred_classes_errors = Y_pred_classes[errors]
        Y_pred_errors = pred[errors]
        Y_true_errors = Y_true[errors]
        X_val_errors = X_valid[errors]

        def display_errors(errors_index, img_errors, pred_errors, obs_errors):
            n = 0
            nrows = 2
            ncols = 5
            fig, ax = plt.subplots(nrows, ncols, sharex=True, sharey=True, figsize=(20, 8))
            for row in range(nrows):
                for col in range(ncols):
                    error = errors_index[n]
                    ax[row, col].imshow((img_errors[error]).reshape((image_size, image_size,3)))
                    ax[row, col].set_title("Predicted label :{}\nTrue label :{}".format(pred_errors[error], obs_errors[error]))
                    n += 1
            plt.show()

        Y_pred_errors_prob = np.max(Y_pred_errors,axis = 1)
        true_prob_errors = np.diagonal(np.take(Y_pred_errors, Y_true_errors, axis=1))
        delta_pred_true_errors = Y_pred_errors_prob - true_prob_errors
        sorted_dela_errors = np.argsort(delta_pred_true_errors)
        most_important_errors = sorted_dela_errors[-10:]

        display_errors(most_important_errors, X_val_errors, Y_pred_classes_errors, Y_true_errors)
        
        #explainer = shap.DeepExplainer(model, (X_train[0:100]))
        #for i in most_important_errors:
        #    shap_values = explainer.shap_values(X_val_errors[[i]])
        #    index_names = np.array([str(x) + "\n" + '{:>7.3%}'.format(Y_pred_errors[i][x]) for x in range(output_num)]).reshape(1,output_num)
        #    print("Predicted label :{}\nTrue label :{}".format(Y_pred_classes_errors[i],Y_true_errors[i]))
        #    
        #    shap.image_plot(shap_values, X_val_errors[[i]] ,index_names)
        #    plt.gcf().set_size_inches(20, 20)

        print('>>>> FOLD %i val Acc ='%(fold+1), ACC_pred)
        print()


In [None]:
model_train(X,Y)

In [None]:
file_list = []
test_path = "./fruits-360/Test/"
Y_test = []

for dir_name in os.listdir(test_path):
    if "Apple" in dir_name:
        for file_name in os.listdir(test_path + dir_name):
            Y_test.append(dir_name)
            file_list.append(test_path + dir_name + "/" + file_name)

In [None]:
image_size=80
X_test = []

for image_path in file_list:
    #画像ファイル取得
    img_file = glob.glob(image_path)
    if not img_file:
        print(image_path)
        continue
    image = Image.open(img_file[0])
    image = image.convert("RGB")
    image = image.resize((image_size, image_size))
    X_test.append(np.array(image))
    
del image

In [None]:
X_test = np.array(X_test).astype("float32") / 255
ce_ohe = ce.OneHotEncoder(handle_unknown='impute')
Y_test = np.array(ce_ohe.fit_transform(Y_test))

In [None]:
pred = np.zeros((len(X_test),13))

for i in range(5):
    print(i)
    model = pickle.load(open("model/model_"+ str(i) +".sav","rb"))
    temp = model.predict(X_test)
    pred += temp

In [None]:
Y_pred = argmax(pred, axis=1)
Y_true = argmax(Y_test, axis=1)
ACC_pred = accuracy_score(Y_pred, Y_true)
print('>>>> val Acc =', ACC_pred)

In [None]:
X_test.shape

In [None]:
#Y_pred_classes = np.argmax(pred,axis = 1) 
#Y_true = np.argmax(Y_val,axis = 1)

In [None]:
errors = (Y_pred - Y_true != 0)

Y_pred_classes_errors = Y_pred[errors]
Y_pred_errors = pred[errors]
Y_true_errors = Y_true[errors]
X_val_errors = X_test[errors]

In [None]:
def display_errors(errors_index,img_errors,pred_errors, obs_errors):
    """ This function shows 10 images with their predicted and real labels"""
    n = 0
    nrows = 2
    ncols = 5
    fig, ax = plt.subplots(nrows,ncols,sharex=True,sharey=True,figsize=(20, 8))
    for row in range(nrows):
        for col in range(ncols):
            error = errors_index[n]
            ax[row,col].imshow((img_errors[error]).reshape((image_size,image_size,3)))
            ax[row,col].set_title("Predicted label :{}\nTrue label :{}".format(pred_errors[error],obs_errors[error]))
            n += 1
    plt.show()
    
Y_pred_errors_prob = np.max(Y_pred_errors,axis = 1)
true_prob_errors = np.diagonal(np.take(Y_pred_errors, Y_true_errors, axis=1))
delta_pred_true_errors = Y_pred_errors_prob - true_prob_errors
sorted_dela_errors = np.argsort(delta_pred_true_errors)
most_important_errors = sorted_dela_errors[-10:]

display_errors(most_important_errors, X_val_errors, Y_pred_classes_errors, Y_true_errors)

In [None]:
np.array(X).shape

In [None]:
#When describing Deep Learning, we use DeepExplainer.
explainer = shap.DeepExplainer(model, (np.array(X)[0:100]))

#Check out the 10 data that the model has mistakenly predicted.
for i in most_important_errors:
    
    #Calculates the SHAP value.
    shap_values = explainer.shap_values(X_val_errors[[i]])
    
    #The following two lines are extras. It works even if you turn it off.
    index_names = np.array([str(x) + "\n" + '{:>7.3%}'.format(Y_pred_errors[i][x]) for x in range(10)]).reshape(1,10)
    print("Predicted label :{}\nTrue label :{}".format(Y_pred_classes_errors[i],Y_true_errors[i]))
    
    #Displays the results.
    shap.image_plot(shap_values, X_val_errors[[i]] ,index_names ,show=False)
    plt.show()