### Libraries Import

In [1]:
import os
import glob
import time
from imutils import build_montages
import matplotlib.pyplot as plt
import numpy as np
import cv2
import tensorflow as tf
import tensorflow.keras.backend as K
import keras
from keras.models import Model
from keras.layers import Dense
from keras.layers import Input
from keras.layers import Lambda
from keras.applications.vgg16 import VGG16
from keras.preprocessing import image as kimage
from keras.applications.vgg16 import preprocess_input
import scipy.spatial.distance
%matplotlib inline

Using TensorFlow backend.


### Dataset import

In [2]:
def read_image(img_path):
    #img = kimage.load_img(img_path, target_size=(224, 224))
    img = kimage.load_img(img_path)
    x = kimage.img_to_array(img)
    return x
start = time.time()
consolidated_jpgdir = '/mnt/data/style_transfers/datasets/consolidated/jpg/'
#selected_traversals = ['A000', 'A003', 'A058', 'A059', 'A064', 'A155', 'A159', 'A035', 'A037', 'A079', 'A086', 'A150', 'A151', 'A025', 'A026', 'A062', 'A063', 'A064', 'A138']
filenames = [img for img in glob.glob(consolidated_jpgdir + "*000.000..jpg")]
filenames = sorted(filenames)
img = read_image(filenames[0])
(H, W, D) = img.shape
images = np.zeros((len(filenames), H, W, D), dtype=np.uint8)
for i, filename in enumerate(filenames):
    images[i,:] = read_image(filename)
print ('[INFO] {} images loaded in {:.1f} seconds.'.format(len(filenames), (time.time()-start)))

[INFO] 179 images loaded in 0.9 seconds.


#### Getting the horizontal offset from ground truth

In [3]:
consolidated_gtdir = '/mnt/data/style_transfers/datasets/consolidated/gt/'
gt = np.zeros((len(filenames)), dtype=int)
consolidated_gt = consolidated_gtdir + "A*_GT.txt"
gt_filenames = [gt for gt in glob.glob(consolidated_gt)]
gt_filenames = sorted(gt_filenames)
for i, filename in enumerate(gt_filenames):
    data = []
    with open(filename, "r") as f:
        data = f.read()
    data = data.split("\n")
    data = list(filter(None, data))
    (horizontal, _) = [int(k) for k in data[0].split(" ")]
    gt[i] = horizontal 
print("[INFO] Ground truth loaded.")

[INFO] Ground truth loaded.


In [4]:
gt_ = list(filter(lambda x: x < 500 and x > -500, gt))
gt_mean = np.mean([np.mean(gt__) for gt__ in gt_])
gt_median = np.median([np.median(gt__) for gt__ in gt_])
gt_min = np.min([np.min(gt__) for gt__ in gt_])
gt_max = np.max([np.max(gt__) for gt__ in gt_])

print('MEAN: ', gt_mean, '\nMEDIAN:', gt_median, "\nMIN:", gt_min, "\nMAX:", gt_max)

MEAN:  -14.11731843575419 
MEDIAN: -14.0 
MIN: -60 
MAX: 41


### Sliding the image 

In [5]:
def cv2_imshow(a, title=None, mode=None, i=None, j=None, k=None, **kwargs):
    a = a.clip(0, 255).astype('uint8')
    if a.ndim == 3:
        if a.shape[2] == 4:
            a = cv2.cvtColor(a, cv2.COLOR_BGRA2RGBA)
        else:
            a = cv2.cvtColor(a, cv2.COLOR_BGR2RGB)
    if mode == 'subplot':
        plt.subplot(i, j, k)
        return plt.imshow(a, **kwargs)
    else:
        plt.figure(figsize=(12,8))
        plt.title(title)
        return plt.imshow(a, **kwargs)
def imshow(a, title=None, mode=None, i=None, j=None, k=None, **kwargs):
    a = a.clip(0, 255).astype('uint8')
    if mode == 'subplot':
        plt.subplot(i, j, k)
        return plt.imshow(a, **kwargs)
    else:
        plt.figure(figsize=(12,8))
        plt.title(title)
        return plt.imshow(a, **kwargs)
def process_image(img):
    x = np.resize(img, (224,224,3))
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    return x
def get_slices(img, offset_size, slides_count):
    a = np.zeros((slides_count, 224, 224, 3))
    for i in range(0, slides_count):
        a[i,:] = process_image(img[:, offset_size*i:offset_size*i+H])
    return a

## 1. Generating pairs

In [6]:
def make_pairs(images, base_slices, gt, offset_size, threshold):
    pairImages = []
    pairLabels = []
    slides_count = (W - H) // offset_size
    
    # loop over all images
    for traversal_idx in range(len(images)):
        if traversal_idx % 10 == 0:
            print(traversal_idx)
        if traversal_idx == 0:
            continue
        target_gt = gt[traversal_idx]
        if abs(target_gt) > 300:
            continue
        target_img = images[traversal_idx]
        slides = get_slices(target_img, offset_size, slides_count)    
        for j, target_slide in enumerate(slides):
            pairImages.append([base_slices[0,0], target_slide])  # LEFT
            pairImages.append([base_slices[1,0], target_slide])  # MIDDLE
            pairImages.append([base_slices[2,0], target_slide])  # RIGHT
            if abs(target_gt - j*offset_size) < threshold:     # LEFT
                pairLabels.append([1])
            else:
                pairLabels.append([0])
            if abs(250+target_gt - j*offset_size) < threshold: # MIDDLE
                pairLabels.append([1])
            else:
                pairLabels.append([0])
            if abs(500+target_gt - j*offset_size) < threshold: # RIGHT
                pairLabels.append([1])
            else:
                pairLabels.append([0])
    return (np.array(pairImages), np.array(pairLabels))

## 2. Building the Siamese NN

### Pre-trained VGG16 'sister' network architecture:

In [7]:
def buildVGG16_model(input_shape):
    vgg16 = keras.applications.VGG16(weights='imagenet', include_top=True, pooling='max', input_shape=input_shape)
    basemodel = Model(inputs=vgg16.input, outputs=vgg16.get_layer('fc2').output)
    return basemodel

In [8]:
def get_feature_vector(basemodel, img):
    feature_vector = basemodel.predict(img)
    return feature_vector
def euclidean_distance(vectors):
    (featsA, featsB) = vectors
    sumSquared = K.sum(K.square(featsA - featsB), axis=1,keepdims=True)
    return K.sqrt(K.maximum(sumSquared, K.epsilon()))

## 3. Training the network

In [9]:
def plot_training(H, plotPath):
    # construct a plot that plots and saves the training history
    plt.style.use("ggplot")
    plt.figure()
    plt.plot(H.history["loss"], label="train_loss")
    plt.plot(H.history["val_loss"], label="val_loss")
    plt.plot(H.history["accuracy"], label="train_acc")
    plt.plot(H.history["val_accuracy"], label="val_acc")
    plt.title("Training Loss and Accuracy")
    plt.xlabel("Epoch #")
    plt.ylabel("Loss/Accuracy")
    plt.legend(loc="lower left")
    plt.savefig(plotPath)

#### Setting the parameters:

In [10]:
offset_size = 2
threshold = 1
batch_size = 100
epochs = 64
IMG_SHAPE = (224,224,3)

BASE_OUTPUT = "output"
MODEL_PATH = os.path.sep.join([BASE_OUTPUT, "siamese_model"])
PLOT_PATH = os.path.sep.join([BASE_OUTPUT, "plot.png"])

In [11]:
base_img = images[0]
base_slices = np.array([process_image(bimg) for bimg in [base_img[:,:W//3], base_img[:,W//3:(W//3)*2], base_img[:,(W//3)*2+2:]]])

In [12]:
start = time.time()
(pairImages, pairLabels) = make_pairs(images, base_slices, gt, offset_size, threshold)
print ('[INFO] {} pairs of image paths with offsets were generated in {:.1f} seconds.'.format(len(pairImages), (time.time()-start)))

0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
170
[INFO] 72624 pairs of image paths with offsets were generated in 124.3 seconds.


### Training the Siamese network

In [13]:
print("[INFO] building siamese network...")
IMG_SHAPE = (224,224,3)
imgA = Input(shape=IMG_SHAPE)
imgB = Input(shape=IMG_SHAPE)
VGGfeatureExtractor = buildVGG16_model(IMG_SHAPE) # building the network only once since we want to share the weights 
featsA = VGGfeatureExtractor(imgA)
featsB = VGGfeatureExtractor(imgB)

[INFO] building siamese network...


In [14]:
# finally, construct the siamese network
distance = Lambda(euclidean_distance)([featsA, featsB])
outputs = Dense(1, activation="sigmoid")(distance)
model = Model(inputs=[imgA, imgB], outputs=outputs)
print(model.summary())

Model: "model_2"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 224, 224, 3)  0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 224, 224, 3)  0                                            
__________________________________________________________________________________________________
model_1 (Model)                 (None, 4096)         134260544   input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 1)            0           model_1[1][0]              

In [None]:
print("[INFO] compiling model...")
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

# train the model
print("[INFO] training model...")
history = model.fit([pairImages[:, 0], pairImages[:, 1]], pairLabels[:],
    #validation_data=([pairTest[:, 0], pairTest[:, 1]], labelTest[:]),
    batch_size=batch_size,
    verbose=1,
    epochs=epochs)

# serialize the model to disk
print("[INFO] saving siamese model...")
model.save(MODEL_PATH)

# plot the training history
print("[INFO] plotting training history...")
utils.plot_training(history, PLOT_PATH)

[INFO] compiling model...
[INFO] training model...
Epoch 1/64


In [None]:
model.save(MODEL_PATH)

### Extracting features for test dataset

In [None]:
score = model.evaluate([pairTrain[:, 0], pairTrain[:, 1]], labelTrain[:], verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

## 4. Testing the network

In [None]:
### Testing
features = dataset_testing()

num_of_samples = features.shape[0]
labels = np.ones((num_of_samples,),dtype='int64')

json_file = open('model.json', 'r')
loaded_model_json = json_file.read()
json_file.close()
model = model_from_json(loaded_model_json)
model.load_weights("model6.h5")
names = ['Change', 'No change']

Y = np_utils.to_categorical(labels, num_classes)
x_test,y_test = shuffle(features,Y, random_state=2)

score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])