In [None]:
import numpy as np
import os
import tensorflow as tf
from PIL import ImageFile
import pandas as pd
import seaborn as sns
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import *
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.backend import *
from sklearn.metrics import roc_auc_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest
from sklearn import svm
from tqdm import tqdm
import pickle
import tensorflow as tf
from tensorflow.keras.applications import *
from tensorflow.keras.layers import *
from tensorflow.keras import*
from tensorflow.keras.models import *
import tensorflow as tf 
ImageFile.LOAD_TRUNCATED_IMAGES = True
plt.style.use('fivethirtyeight')
%matplotlib inline

# One-class svm and Isolation Forest to classify unknown class

In [None]:
root_dir = "../input/aiijc-new128x128/New folder"
class_names = os.listdir(root_dir)
class_names.sort()
class_names.pop()

In [None]:
# create dataset with known/unknown
x = []
y = []
z = []
X_train_paths = []
X_val_paths = []
y_val = []

for label in tqdm(class_names):
  img_names = os.listdir(root_dir + "/" + label)
  count = 0
  while count < 5:
    x.append(os.path.join(label, img_names[count]))
    y.append("sign")
    X_val_paths.append(os.path.join(label, img_names[count]))
    y_val.append(1)
    z.append("val")
    img_names.pop(0)
    count+=1
  for img_name in img_names:
    x.append(os.path.join(label, img_name))
    y.append("sign")
    X_train_paths.append(os.path.join(os.path.join(label, img_name)))
    z.append("train")

unknown_class = os.listdir(root_dir + "/" + "unknown")
for i in tqdm(range(len(unknown_class))):
  x.append(os.path.join("unknown", unknown_class[i]))
  y.append("unknown")
  X_val_paths.append(os.path.join("unknown", unknown_class[i]))
  y_val.append(0)
  z.append("val")

In [None]:
df = pd.DataFrame({"filename": x, "label": y, "": z})
df.tail(7)

In [None]:
print(len(X_train_paths), len(X_val_paths))

In [None]:
image_size = 32

def read_and_prep_images(img_paths, img_height=image_size, img_width=image_size):
    imgs = [load_img(os.path.join(root_dir,img_path), target_size=(img_height, img_width)) for img_path in tqdm(img_paths)]
    img_array = np.array([img_to_array(img) for img in imgs])
    #output = img_array
    output = tf.keras.applications.resnet.preprocess_input(img_array)
    return(output)

X_train = read_and_prep_images(X_train_paths)
X_val = read_and_prep_images(X_val_paths)

In [None]:
# using resnet for extracting features
resnet_model = tf.keras.applications.ResNet50(input_shape=(image_size, image_size, 3), 
                                              weights="imagenet", include_top=False, 
                                              pooling='avg')
                              
X_train = resnet_model.predict(X_train)
X_val = resnet_model.predict(X_val)

In [None]:
ss = StandardScaler()
ss.fit(X_train)
X_train = ss.transform(X_train)
X_val = ss.transform(X_val)

# Take PCA to reduce feature space dimensionality
pca = PCA(n_components=512, whiten=True)
pca = pca.fit(X_train)
print('Explained variance percentage = %0.2f' % sum(pca.explained_variance_ratio_))
X_train = pca.transform(X_train)
X_val = pca.transform(X_val)

In [None]:
# test one-class svm and isolation forest
oc_svm_clf = svm.OneClassSVM(gamma=0.1, kernel='rbf', nu = 0.01)  
if_clf = IsolationForest(contamination=0.1, max_features=1.0, max_samples=1.0, n_estimators=40)  

oc_svm_clf.fit(X_train)
if_clf.fit(X_train)

oc_svm_preds = oc_svm_clf.predict(X_val)
if_preds = if_clf.predict(X_val)

In [None]:
test_df = pd.DataFrame({"filename": X_val_paths, "label": y_val})
test_df.tail()

In [None]:
svm_if_results = pd.DataFrame({
  'filename': X_val_paths,
  'oc_svm_preds': [0 if x == -1 else 1 for x in oc_svm_preds],
  'if_preds': [0 if x == -1 else 1 for x in if_preds]
})

svm_if_results = svm_if_results.merge(test_df)
svm_if_results.tail()

In [None]:
print('roc auc score: if_preds')
if_preds = svm_if_results['if_preds']
actual = svm_if_results['label']
print(roc_auc_score(actual, if_preds))
print(classification_report(actual, if_preds))
sns.heatmap(confusion_matrix(actual, if_preds), annot=True , fmt='2.0f')
plt.show()

In [None]:
print('roc auc score: oc_svm_preds')
oc_svm_preds=svm_if_results['oc_svm_preds']
actual=svm_if_results['label']
print(roc_auc_score(actual, oc_svm_preds))
print(classification_report(actual, oc_svm_preds))
sns.heatmap(confusion_matrix(actual, oc_svm_preds),annot=True,fmt='2.0f')
plt.show()

## Grid-search based algorithms to find best hyper-parameters

In [None]:
#grid search for ocsvm
clf = svm.OneClassSVM()
gammas = np.linspace(0.001, 0.01, 100)
nus = np.linspace(0.070, 0.1, 3)
count = 0
results = []
paras = []
for gamma in gammas:
    for nu in nus:
        clf.set_params(gamma=gamma, nu=nu)
        clf.fit(X_train)
        oc_svm_preds = clf.predict(X_val)
        svm_if_results = pd.DataFrame({
        'filename': X_val_paths,
        'oc_svm_preds': [0 if x == -1 else 1 for x in oc_svm_preds]
        })
        svm_if_results = svm_if_results.merge(test_df)
        
        count += 1
        print("VERSION {}".format(count))
        #print('roc auc score: oc_svm_preds (gamma = {}, nu = {})'.format(gamma, nu))
        paras.append((gamma, nu))
        oc_svm_preds = svm_if_results['oc_svm_preds']
        actual = svm_if_results['label']
        score = roc_auc_score(actual, oc_svm_preds)
        print(score)
        results.append(score)
        #print(classification_report(actual, oc_svm_preds))
        #sns.heatmap(confusion_matrix(actual, oc_svm_preds),annot=True,fmt='2.0f')
        #plt.show()

max_acc = max(results)
print(paras[results.index(max_acc)])

In [None]:
#contamination=0.1, max_features=1.0, max_samples=1.0
contaminations = [0.1,0.01,0.001,0.0001]
max_features = [1,2,3]
max_samples = [1, 10000]
n_estimators = [20, 40, 60, 80, 100]
count = 0
results = []
paras = []
if_clf = IsolationForest()
for contamination in contaminations:
    for max_feature in max_features:
        for max_sample in max_samples:
            for n_estimator in n_estimators:
                if_clf.set_params(contamination=contamination, max_features=max_feature, max_samples=max_sample, n_estimators = n_estimator)
                if_clf.fit(X_train)
                if_preds = if_clf.predict(X_val)
                svm_if_results = pd.DataFrame({
            'filename': X_val_paths,
                'if_preds': [0 if x == -1 else 1 for x in if_preds]
                })
                svm_if_results = svm_if_results.merge(test_df)
            
                count += 1
                print("VERSION {}".format(count))
                print('roc auc score: oc_svm_preds (contamination = {}, max_feature = {}, max_sample = {})'.format(contamination, max_feature, max_sample))
                paras.append((contamination, max_feature, max_sample))
                if_preds = svm_if_results['if_preds']
                actual = svm_if_results['label']
                score = roc_auc_score(actual, if_preds)
                print(score)
                results.append(score)
                print(classification_report(actual, oc_svm_preds))
                sns.heatmap(confusion_matrix(actual, oc_svm_preds),annot=True,fmt='2.0f')
                plt.show()

# Using auto-encoder to classify unknown

In [None]:
# VGG16 based model
encoder = Sequential([Input(shape=(32,32,3)),
                      
                      Conv2D(64, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      Conv2D(64, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      MaxPooling2D(),
                      
                      Conv2D(128, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      Conv2D(128, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      MaxPooling2D(),
                      
                      Conv2D(256, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      Conv2D(256, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      Conv2D(256, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      MaxPooling2D(),
                  
                      Conv2D(512, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      Conv2D(512, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      Conv2D(512, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      MaxPooling2D(),
                      
                      Conv2D(512, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      Conv2D(512, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      Conv2D(512, (3,3), strides=1, padding='same'),
                      LeakyReLU(0.2),
                      BatchNormalization(),
                      
                      Flatten(),
                      Dense(1024, activation=LeakyReLU(0.2))
])

In [None]:
encoder.summary()

In [None]:
decoder= Sequential([
    Input(1024,),
    Dense(2048),
    Reshape(target_shape = (2,2,512)),
    
    Conv2DTranspose(512,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    Conv2DTranspose(512,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    Conv2DTranspose(512,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    
    UpSampling2D(),
    Conv2DTranspose(512,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    Conv2DTranspose(512,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    Conv2DTranspose(512,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    
    UpSampling2D(),
    Conv2DTranspose(256,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    Conv2DTranspose(256,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    Conv2DTranspose(256,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    
    UpSampling2D(),
    Conv2DTranspose(128,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    Conv2DTranspose(128,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),

    UpSampling2D(),
    Conv2DTranspose(64,(3,3), strides =1 ,padding='same'),
    LeakyReLU(0.2),
    BatchNormalization(),
    Conv2DTranspose(64,(3,3), strides =1, padding='same'),
    LeakyReLU(0.2),    
    BatchNormalization(),
    

    Conv2DTranspose(3,(3,3), padding ='same'),
    Activation('sigmoid')
])

In [None]:
decoder.summary()

In [None]:
autoencoder = Sequential([encoder,
                          decoder
])

In [None]:
clear_session()

autoencoder.compile('adam', loss='binary_crossentropy')

checkpoint = tf.keras.callbacks.ModelCheckpoint('autoencoder_1.h5', monitor ='val_loss',save_best_only = True)
history = autoencoder.fit(X_train,X_train, 
                          epochs=100, 
                          batch_size=128, 
                          verbose=1,
                          shuffle=True,
                          validation_data=(X_val,X_val),
                          callbacks = [checkpoint])

In [None]:
import keras
autoencoder =  keras.models.load_model('./autoencoder_1.h5')

In [None]:
decode_imgs = autoencoder.predict(X_val)
decode_imgs = decode_imgs.reshape(len(X_val),32,32, 3)

In [None]:
mse = tf.keras.losses.MeanSquaredError()
loss=[]
for i in tqdm(range(len(X_val))):
    a=mse(X_val[i], decode_imgs[i]).numpy()
    loss.append(a)

In [None]:
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
    # Display original
    ax = plt.subplot(2, n, i + 1)
    plt.imshow(X_val[i].reshape(32, 32,3))
    plt.title('Original Image')
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # Display reconstruction
    ax = plt.subplot(2, n, i + 1 + n)
    plt.imshow(decode_imgs[i].reshape(32, 32,3))
    plt.title('Decoded Image')
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()