### Mount Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

%cd /content/drive/My Drive/ColabData/IML/task4/data/

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive
/content/drive/My Drive/ColabData/IML/task4/data


In [2]:
%%capture
!unzip "/content/drive/My Drive/ColabData/IML/task4/data/food.zip" -d /

### Import Libraries

In [3]:
import os
import glob
import csv 

import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook as tqdm

from sklearn.model_selection import train_test_split
from skimage import io, transform

from numpy.linalg import norm 
import matplotlib.pyplot as plt 

import tensorflow as tf

from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input

In [4]:
train = pd.read_csv('train_triplets.txt', header=None, sep=' ', dtype='str')
test = pd.read_csv('test_triplets.txt', header=None, sep=' ', dtype='str')
train.columns = ['img', 'img1', 'img2']
test.columns = ['img', 'img1', 'img2']

print (train.shape, test.shape)

train.head()

(59515, 3) (59544, 3)


Unnamed: 0,img,img1,img2
0,2461,3450,2678
1,2299,2499,4987
2,4663,1056,3029
3,4532,1186,1297
4,3454,3809,2204


### Load Images and Train-Validation Split

In [5]:
allow_val_overlap = True

def get_data(df, ids):
    def select(x, y, z):
        if x in ids and y in ids and z in ids:
            return True
        return False
    dat = df.copy()
    dat['Select'] = dat[['img', 'img1', 'img2']].apply(lambda x: select(x['img'], x['img1'], x['img2']), axis=1)
    return df[dat['Select']==True]
    
if allow_val_overlap:
    x_train, x_val = train_test_split(train, shuffle=True, random_state=2019, test_size=0.1)
else:
    temp = np.unique(train[['img', 'img1', 'img2']].values)
    np.random.seed(643)
    ids = np.random.randint(0, len(temp), size=1500)
    tids = temp[~np.isin(np.arange(len(temp)), ids)]
    vids = temp[ids]

    trn = get_data(train, tids)
    val = get_data(train, vids)
    x_train = trn
    x_val = val

img_dir = '/food/'
imgs = {}
for file in tqdm(glob.glob(img_dir+'*.jpg')):
    img = tf.keras.preprocessing.image.load_img(file, target_size=(224, 224))
    img = tf.keras.preprocessing.image.img_to_array(img) 
    img = np.expand_dims(img, axis=0)
    img = preprocess_input(img) 
    imgs[os.path.basename(file)[:-4]] = np.squeeze(img)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`


HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))




In [6]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, list_IDs, batch_size=32, inp_dim=(224, 224, 3), n_classes=2, shuffle=True):
        self.inp_dim = inp_dim
        self.batch_size = batch_size
        self.list_IDs = list_IDs
        self.n_classes = n_classes
        
        self.shuffle = shuffle
        self.on_epoch_end()
        
    def __len__(self):
        return int(np.ceil(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        list_IDs_temp = [self.list_IDs[k] for k in indexes]
        X1, X2, X3, y = self.__data_generation(list_IDs_temp)
        return [X1, X2, X3], y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        X1 = []
        X2 = []
        X3 = []
        y = np.empty((self.batch_size), dtype=int)
        for i, ID in enumerate(list_IDs_temp):
            record = ID 
            X1.append(imgs[record[0]])
            X2.append(imgs[record[1]])
            X3.append(imgs[record[2]])
            y[i] = 1
        return np.array(X1), np.array(X2), np.array(X3), y

training_generator = DataGenerator(x_train.values, batch_size=64, n_classes=2, shuffle=True)
validation_generator = DataGenerator(x_val.values, batch_size=64, n_classes=2, shuffle=True)
testing_generator = DataGenerator(test.values, batch_size=64, n_classes=2, shuffle=False)

### Losses

In [7]:
loss_p = 2 # 1, 2 or cosine
metric_p = 2 # 1, 2 or cosine

def triplet_loss(y_true, y_pred, alpha=1, p=loss_p):
    total_length = y_pred.shape.as_list()[-1]
    anc = y_pred[:, 0:int(total_length*1/3)]
    pos = y_pred[:, int(total_length*1/3):int(total_length*2/3)]
    neg = y_pred[:, int(total_length*2/3):int(total_length*3/3)]
    if p == 1:
        pos_dist = tf.keras.backend.mean(tf.keras.backend.abs(anc - pos))
        neg_dist = tf.keras.backend.mean(tf.keras.backend.abs(anc - neg))
        basic_loss = pos_dist - neg_dist + alpha
        loss = tf.keras.backend.maximum(basic_loss, 0.0)
        return loss
    elif p == 'cosine':
        distance1 = tf.keras.losses.cosine_similarity(anc, pos)
        distance2 = tf.keras.losses.cosine_similarity(anc, neg)
        loss = tf.keras.backend.maximum(distance1 - distance2 + alpha, 0.0)
        return loss
    else:
        assert p == 2, 'p can be 1, 2 or cosine'
        pos_dist = tf.keras.backend.mean(tf.keras.backend.square(anc - pos))
        neg_dist = tf.keras.backend.mean(tf.keras.backend.square(anc- neg))
        basic_loss = pos_dist - neg_dist + alpha
        loss = tf.keras.backend.maximum(basic_loss, 0.0)
        return loss

def accuracy(y_true, y_pred, p=metric_p):
    total_length = y_pred.shape.as_list()[-1]
    anchor = y_pred[:, 0:int(total_length*1/3)]
    positive = y_pred[:, int(total_length*1/3):int(total_length*2/3)]
    negative = y_pred[:, int(total_length*2/3):int(total_length*3/3)]
    if p == 1 or p == 2:
        pos_dist = tf.linalg.norm((anchor - positive), ord=p, axis=1) 
        neg_dist = tf.linalg.norm((anchor - negative), ord=p, axis=1)
    else:
        normalize_a = tf.math.l2_normalize(anchor, axis=1)
        normalize_b = tf.math.l2_normalize(positive, axis=1)        
        normalize_c = tf.math.l2_normalize(negative, axis=1)        
        pos_dist = -tf.matmul(normalize_a, normalize_b, transpose_b=True)
        neg_dist = -tf.matmul(normalize_a, normalize_c, transpose_b=True)
    labels = tf.cast(pos_dist < neg_dist, tf.float32)
    acc = tf.keras.metrics.binary_accuracy(y_true, labels)
    return acc

def seed(seed=643):
    np.random.seed(seed)
    tf.random.set_seed(seed)

### Train Final Layer

In [8]:
encoding_size = 20
input_dim = 2048

opt = tf.keras.optimizers.Adam(lr=0.00005, beta_1=0.9, beta_2=0.999)

seed()

class L2Normalize(tf.keras.layers.Layer):
    def __init__(self, axis=1):
        super(L2Normalize, self).__init__()
        self.axis = axis
    def call(self, inputs):
        return tf.keras.backend.l2_normalize(inputs, axis=self.axis)

base_model = ResNet50(weights='imagenet')
last_layer = base_model.get_layer('avg_pool')
x = last_layer.output
x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.Dense(input_dim//2, activation='relu')(x)
x = tf.keras.layers.Dropout(0.25)(x)
out = tf.keras.layers.Dense(encoding_size)(x)
out = L2Normalize(axis=1)(out)

for layer in base_model.layers:
    layer.trainable = False
Net = tf.keras.Model(base_model.input, out)    

anchor_input = tf.keras.layers.Input((224, 224, 3), name='anchor_input')
positive_input = tf.keras.layers.Input((224, 224, 3), name='positive_input')
negative_input = tf.keras.layers.Input((224, 224, 3), name='negative_input')
encoded_anchor = Net(anchor_input)
encoded_positive = Net(positive_input)
encoded_negative = Net(negative_input)
merged_vector = tf.keras.layers.concatenate([encoded_anchor, encoded_positive, encoded_negative], axis=-1, name='merged_layer')

model = tf.keras.Model(inputs=[anchor_input, positive_input, negative_input], outputs=merged_vector)
model.compile(loss=triplet_loss, optimizer=opt, metrics=[accuracy])
model.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels.h5
Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
anchor_input (InputLayer)       [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
positive_input (InputLayer)     [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
negative_input (InputLayer)     [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
model (Model)                   (None, 20)           2

In [None]:
history = model.fit(training_generator, validation_data=validation_generator, batch_size=64, epochs=7)

Epoch 1/5
Epoch 2/5

### Fine-tune the Base Model

In [None]:
fine_tune = False

if fine_tune:
    unfreeze = False
    for layer in base_model.layers:
        if unfreeze:
            layer.trainable = True
        if layer.name == 'conv4_block6_out':
            print ('Unfreezing last layers')
            unfreeze = True

    opt = tf.keras.optimizers.Adam(lr=0.00001)
    model.compile(loss=triplet_loss, optimizer=opt, metrics=[accuracy])
    history2 = model.fit(training_generator, validation_data=validation_generator, batch_size=64, epochs=1)

### Predict on Test Images

In [None]:
test_emb = model.predict(testing_generator, batch_size=64, verbose=1)



In [None]:
def emb2labels(y_pred, p=metric_p):
    total_length = y_pred.shape[-1]
    anchor = y_pred[:, 0:int(total_length*1/3)]
    positive = y_pred[:, int(total_length*1/3):int(total_length*2/3)]
    negative = y_pred[:, int(total_length*2/3):int(total_length*3/3)]
    if p == 1 or p == 2:
        pos_dist = np.linalg.norm((anchor - positive), ord=p, axis=1) 
        neg_dist = np.linalg.norm((anchor - negative), ord=p, axis=1)
    else:
        normalize_a = tf.math.l2_normalize(anchor, axis=1)
        normalize_b = tf.math.l2_normalize(positive, axis=1)        
        normalize_c = tf.math.l2_normalize(negative, axis=1)        
        pos_dist = -tf.matmul(normalize_a, normalize_b, transpose_b=True)
        neg_dist = -tf.matmul(normalize_a, normalize_c, transpose_b=True)
    labels = (pos_dist < neg_dist).astype('int32')
    return labels

test_pred = emb2labels(test_emb)

In [None]:
sub = pd.DataFrame(test_pred)
assert sub.shape[0] == test.shape[0]

sub.to_csv('sub_task4_overlap_e5.txt', header=False, index=False, sep=' ')
sub.head()

Unnamed: 0,0
0,0
1,0
2,0
3,0
4,1


In [None]:
test.shape, len(test_pred)

((59544, 3), 59520)

### Plots

In [None]:
plt.figure(figsize=(16, 5))

plt.subplot(111)
plt.plot(history.history['accuracy'], label='Loss')
plt.title('Train Accuracy')
plt.grid()

plt.subplot(122)
plt.plot(history.history['val_accuracy'], label='Loss')
plt.title('Validation Accuracy')
plt.grid()

plt.show()

In [None]:
from mpl_toolkits.axes_grid1 import ImageGrid

fig = plt.figure(figsize=(16, 10*5))
grid = ImageGrid(fig, 111, nrows_ncols=(10, 3), axes_pad=0.4)

im = []
for i in [1, 10, 2000, 6000, 10000, 15000, 20000, 25000, 40000, 50000]:
    lab = test_preds.iloc[i][0]
    if lab == 1:
        text1 = 'Similar'
        text2 = 'Not similar'
    elif lab == 0:
        text2 = 'Similar'
        text1 = 'Not similar'
    im.append((io.imread(img_dir+test.iloc[i]['img']+'.jpg'), 'Food'))
    im.append((io.imread(img_dir+test.iloc[i]['img1']+'.jpg'), text1))
    im.append((io.imread(img_dir+test.iloc[i]['img2']+'.jpg'), text2))

for ax, im in zip(grid, im):
    ax.imshow(im[0])
    ax.set_title(im[1])
plt.show()