In [0]:
import os, glob

import pandas as pd
import numpy as np
import tensorflow as tf
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt

In [0]:
!pip install lime
!pip install opencv-python

In [0]:
import tensorflow as tf
tf.test.gpu_device_name()

In [0]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive

In [0]:
project_dir = 'My Drive/projects/ING/Experiment_week/garbage_segmentation/'
data_dir = os.path.join(project_dir, 'data', 'raw')
train_data_dir = os.path.join(data_dir, "train")
test_data_dir = os.path.join(data_dir, "test")
models_dir = project_dir + "/models"
os.listdir(project_dir + "/data/raw/train/paper")[:5]

In [0]:
classes = os.listdir(project_dir + "/data/raw/train")

def to_categorical(labels, label_to_index):
    labels_int = pd.Series(labels).map(label_to_index)
    return tf.keras.utils.to_categorical(labels_int, num_classes=len(label_to_index))

label_to_index = dict(zip(classes, range(len(classes))))
index_to_label = {v: k for k, v in label_to_index.items()}
classes, label_to_index

In [0]:
import os, glob

import pandas as pd
import numpy as np
import tensorflow as tf
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt

from keras.applications.inception_v3 import InceptionV3
from keras.applications.resnet50 import ResNet50
from keras_preprocessing.image import ImageDataGenerator
import keras
from keras.models import Sequential
from keras import optimizers, losses
from keras.layers import Conv2D, Flatten, MaxPooling2D, Dense, Dropout, GlobalAveragePooling2D, BatchNormalization
from keras.callbacks import ModelCheckpoint

In [0]:
img_height, img_width = 224, 224
target_size = (img_height, img_width)
batch_size = 32
nb_epochs = 2

In [0]:
from keras.applications.resnet50 import preprocess_input

train_datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    validation_split=0.1,
    #horizontal_flip=True,
    #vertical_flip=True,
    #shear_range=0.1,
    zoom_range=0.1,
    width_shift_range=0.1,
    height_shift_range=0.1,
    rotation_range=30,
)

train_generator = train_datagen.flow_from_directory(
    train_data_dir,
    target_size=(img_height, img_width),
    batch_size=batch_size,
    classes=classes,
    shuffle=True,
    subset='training'
)

validation_generator = train_datagen.flow_from_directory(
    train_data_dir, # same directory as training data
    target_size=(img_height, img_width),
    batch_size=batch_size,
    classes=classes,
    shuffle=True,
    subset='validation'
)

test_generator = ImageDataGenerator(
    preprocessing_function=preprocess_input,
).flow_from_directory(
    directory=test_data_dir,
    target_size=(img_height, img_width),
    shuffle=False,
    classes=classes,
    batch_size=1
)

In [0]:
it = ImageDataGenerator(
    zoom_range=0.1,
    width_shift_range=0.1,
    height_shift_range=0.1,
    rotation_range=30,
    validation_split=0.002,
).flow_from_directory(
    train_data_dir, 
    target_size=(img_height, img_width),
    batch_size=1,
    classes=classes,
    subset='validation',
    shuffle=True,
    seed=1
)
print(it.n, len(it.filenames), len(it))
# generate samples and plot
plt.figure(figsize=(10, 10))
for i in range(9):
  plt.subplot(330 + 1 + i)
  batch = it.next()
  image = batch[0][0].astype('uint8')
  plt.imshow(image, origin='lower')
plt.show()


In [0]:
(train_generator.class_indices, validation_generator.class_indices, test_generator.class_indices)

In [0]:
base_model = ResNet50(include_top=False, weights='imagenet', input_shape=(img_height, img_width, 3))
base_model.trainable = False

model = Sequential([
    base_model,
    # BatchNormalization(),
    Flatten(),
    Dense(200, activation='relu'),
    Dropout(0.2),
    Dense(100, activation='relu'),
    Dropout(0.2),
    Dense(train_generator.num_classes, activation='softmax')
])
model = Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dropout(0.15),
    Dense(1024, activation='relu'),
    Dropout(0.15),
    Dense(train_generator.num_classes, activation='softmax')                    
])
model.summary()

model.compile(
    optimizer=optimizers.adam(lr=1e-3),
    loss=losses.CategoricalCrossentropy(from_logits=False),
    metrics=['accuracy']
)

In [0]:
nb_epochs = 8
data_augmentation_factor = 4.0

model_filepath=os.path.join(models_dir, "model_ver1_weights.best.hdf5")
checkpoint = ModelCheckpoint(model_filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
callbacks_list = [checkpoint]


model.fit_generator(
    train_generator,
    steps_per_epoch = data_augmentation_factor * train_generator.n // batch_size,
    validation_data = validation_generator, 
    validation_steps = data_augmentation_factor * validation_generator.n // batch_size,
    epochs = nb_epochs,
    callbacks = callbacks_list
)
model.load_weights(model_filepath)

In [0]:
model.load_weights(model_filepath)

In [0]:
model.save(os.path.join(models_dir, "model_ver1.h5"))

In [0]:
from keras.models import load_model
# model = tf.keras.models.load_model(os.path.join(models_dir, "far_model_20200422.h5"))
model = load_model(os.path.join(models_dir, "model_ver1.h5"))

In [0]:
model.evaluate_generator(test_generator, steps=test_generator.n, verbose=1)

In [0]:
valid_pred = model.predict_generator(validation_generator, steps=validation_generator.n, verbose=1)
np.mean(valid_pred, axis=0)

In [0]:
test_pred = model.predict_generator(test_generator, steps=test_generator.n, verbose=1)

In [0]:
np.mean(test_pred, axis=0)

In [0]:
test_df = pd.DataFrame({
    'name': test_generator.filenames,
    'pred': np.argmax(test_pred, axis=1),
    'pred_pr': np.max(test_pred, axis=1),
    'target': test_generator.classes
})
test_df.groupby('pred').size().sort_index()

In [0]:
test_df.groupby(['target', 'pred']).size().unstack(level=1).fillna(0).astype(int)

In [0]:
import itertools
nc = len(classes)
fig, axs = plt.subplots(nrows=nc, ncols=nc, figsize=(nc*3, nc*3))
for i, j in itertools.product(range(nc), repeat=2):
  sel = test_df[(test_df.target==i) & (test_df.pred==j)].sort_values(by=['pred_pr', 'name'], ascending=[False, True])
  ax = axs[i][j]
  if len(sel) > 0:
    filename = os.path.join(test_generator.directory, sel.iloc[0]['name'])
    # print(i, j, filename)
    img = tf.keras.preprocessing.image.load_img(filename, target_size=target_size)
    ax.imshow(img, origin='lower')
    ax.set_title(f'target: {classes[i]}\npred: {classes[j]}')
  else:
    ax.set_axis_off()
plt.tight_layout()
plt.show()

In [0]:
from sklearn.metrics import classification_report

In [0]:
print(
    classification_report(
        test_df['target'], 
        test_df['pred'], 
        target_names=classes
    )
)

In [0]:
def predict(filename):
  raw_img = tf.keras.preprocessing.image.load_img(filename, target_size=target_size)
  img = tf.keras.preprocessing.image.img_to_array(raw_img)
  img = preprocess_input(img)
  pred = model.predict(np.stack([img]))[0]
  pred_class = classes[np.argmax(pred)]
  target_class = filename.split("/")[-2]

  plt.figure(figsize=(8, 8))
  plt.imshow(raw_img, origin='lower')
  plt.title(f"target: {target_class} prediction: {pred_class}, pr: {np.max(pred):.2f}")
  plt.show()
predict(test_generator.directory + "/" + test_generator.filenames[60])

In [0]:
import lime
from lime import lime_image
from skimage.segmentation import mark_boundaries
explainer = lime_image.LimeImageExplainer()

def explain(filename):
  raw_img = tf.keras.preprocessing.image.load_img(filename, target_size=target_size)
  img = tf.keras.preprocessing.image.img_to_array(raw_img)

  def predict_fn(img):
    inp = preprocess_input(img)
    return model.predict(inp)

  explanation = explainer.explain_instance(img, predict_fn, top_labels=5, hide_color=0, num_samples=1000)

  temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], positive_only=False, num_features=10, hide_rest=False)
  plt.figure(figsize=(8, 8))
  plt.imshow(tf.keras.preprocessing.image.array_to_img(mark_boundaries(temp / 2 + 0.5, mask)), origin='lower')
  plt.show()

explain(test_generator.directory + "/" + test_generator.filenames[6])

# misc experiments

In [0]:
import cv2
def adv_preprocessing(image):
    #loading imageswith

    preimgs = []
    img = cv2.imread(image, cv2.IMREAD_UNCHANGED)

    #Setting dimensions to resize
    height = 224
    width = 224
    
    dim = (width, height)
    res = cv2.resize(img, dim, interpolation = cv2.INTER_LINEAR)
    preimgs.append(res)
        
#Removing noise from image - Gaussian blur
    
    blurred_img = cv2.GaussianBlur(res, (5,5),0)
    preimgs.append(blurred_img)

    #Segmentation 
    #------------------------------------------------------------------
    image = res
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    ret,thresh = cv2.threshold(gray, 0,255,cv2.THRESH_BINARY+ cv2.THRESH_OTSU)
    
    #More noise removal
    #------------------------------------------------------------------
    kernal = np.ones((3,3), np.uint8)
    opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernal, iterations=2)
    
    #Sure background area
    sure_bg = cv2.dilate(opening, kernal, iterations = 3)
    
    #Finding foreground area
    dist_transform = cv2.distanceTransform(opening, cv2.DIST_L2, 5)
    ret, sure_fg = cv2.threshold(dist_transform, 0.7 * dist_transform.max(), 255, 0)
    
    # Finding unknown region
    sure_fg = np.uint8(sure_fg)
    unknown = cv2.subtract(sure_bg, sure_fg)
    
    #Seperating different objects with different backgrounds
    #Markers labelling
    ret, markers  = cv2.connectedComponents(sure_fg)
    #Add one to all labels so that sure background is 0 not 1
    markers = markers+1
    
    #Mark the unknown region with 0
    markers[unknown == 255] = 0
    
    markers = cv2.watershed(res, markers)
    res[markers == -1] = [255,0,0]
    placeholder = np.random.rand(224,224)
    #Displaying the markers on image
    markers = np.dstack([markers,np.zeros((224,224)), placeholder])
    #Adding 
    preimgs.append(res)
    preimgs.append(markers)
    
    return preimgs

In [0]:
filename = test_generator.directory + "/" + test_generator.filenames[60]
img = tf.keras.preprocessing.image.load_img(filename, target_size=target_size)
plt.imshow(img, origin='lower')
plt.show()
res = adv_preprocessing(filename)[:3]
fig, axs = plt.subplots(ncols=len(res), figsize=(12, 4))
for i, ax in zip(res, axs):
  i = cv2.cvtColor(i, cv2.COLOR_BGR2RGB)
  ax.imshow(tf.keras.preprocessing.image.array_to_img(i), origin='lower')
plt.show()

