<a href="https://colab.research.google.com/github/luca-rossi/zero-shot-learning/blob/main/Zero_Shot_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

### Imports and setup

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
%cd gdrive/My\ Drive/zsl-data//         # TODO change it to your path!!!
!unzip "data.zip" -d "/content"
!pip install tensorflow==2.2

In [None]:
from tensorflow.python.framework.ops import disable_eager_execution
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from sklearn import preprocessing
from sklearn.neighbors import KDTree
import numpy as np
import scipy.io

disable_eager_execution()

### Datasets settings

In [None]:
DATASET_CUB = 'cub'
DATASET_AWA = 'awa'
DATASET_SUN = 'sun'

PARAMS = {
    DATASET_CUB: {
        'file_separator': ' ',
        'input_shape': 2048,
        'n_classes': 200,
        'n_unseen_classes': 50,
        'n_attributes': 312,
        'n_epochs': 70,
        'n_epochs_cls': 25,
        'batch_size': 64,
        'batch_size_cls': 64,
        'learning_rate': 0.0001,
        'learning_rate_cls': 0.001,
        'beta': 0.5,
        'syn_num': 300,
        'gradient_penalty_weight': 10,
        'cls_loss_weight': 0.01,
        'n_critic': 5,
        'latent_dim': 312
    },
    DATASET_AWA: {
        'file_separator': '\t',
        'input_shape': 2048,
        'n_classes': 50,
        'n_unseen_classes': 10,
        'n_attributes': 85,
        'n_epochs': 30,
        'n_epochs_cls': 25,
        'batch_size': 64,
        'batch_size_cls': 64,
        'learning_rate': 0.00001,
        'learning_rate_cls': 0.001,
        'beta': 0.5,
        'syn_num': 1800,
        'gradient_penalty_weight': 10,
        'cls_loss_weight': 0.01,
        'n_critic': 5,
        'latent_dim': 85
    },
    DATASET_SUN: {
        'file_separator': '\t',
        'input_shape': 2048,
        'n_classes': 717,
        'n_unseen_classes': 72,
        'n_attributes': 102,
        'n_epochs': 40,
        'n_epochs_cls': 25,
        'batch_size': 64,
        'batch_size_cls': 64,
        'learning_rate': 0.0002,
        'learning_rate_cls': 0.001,
        'beta': 0.5,
        'syn_num': 400,
        'gradient_penalty_weight': 10,
        'cls_loss_weight': 0.01,
        'n_critic': 5,
        'latent_dim': 102
    },
}

### Config

In [None]:
GAN = True
LOAD = True
GZSL = True
DATASET = DATASET_AWA

INPUT_SHAPE = PARAMS[DATASET]['input_shape']
N_CLASSES = PARAMS[DATASET]['n_classes']
N_UNSEEN_CLASSES = PARAMS[DATASET]['n_unseen_classes']
N_ATTRIBUTES = PARAMS[DATASET]['n_attributes']

LATENT_DIM = PARAMS[DATASET]['latent_dim']
N_CRITIC = PARAMS[DATASET]['n_critic']
N_EPOCHS_GAN = PARAMS[DATASET]['n_epochs']
N_EPOCHS_CLS = PARAMS[DATASET]['n_epochs_cls']
N_BATCH_GAN = PARAMS[DATASET]['batch_size']
N_BATCH_CLS = PARAMS[DATASET]['batch_size_cls']
LEARNING_RATE = PARAMS[DATASET]['learning_rate']
LEARNING_RATE_CLS = PARAMS[DATASET]['learning_rate_cls']
BETA = PARAMS[DATASET]['beta']

GRADIENT_PENALTY_WEIGHT = PARAMS[DATASET]['gradient_penalty_weight']
CLS_LOSS_WEIGHT = PARAMS[DATASET]['cls_loss_weight']
N_SAMPLES_GAN = N_UNSEEN_CLASSES * PARAMS[DATASET]['syn_num']

PATH = '/content/data'
FEATURES_PATH = PATH + '/' + DATASET + '/res101.mat'
ATTRIBUTES_PATH = PATH + '/' + DATASET + '/att_splits.mat'
FILE_SEPARATOR = PARAMS[DATASET]['file_separator']
MODEL_SAVE_PATH = 'cgan_generator.h5'
SAVE_MODEL_EVERY = 10

### Data loading functions

Define constants

In [None]:
TRAINVAL_COL = 'trainval_loc'
TRAIN_COL = 'train_loc'
VAL_COL = 'val_loc'
TEST_COL = 'test_unseen_loc'
TEST_SEEN_COL = 'test_seen_loc'

From the .mat files extract all the features from resnet and the attribute splits. 
- The res101 contains features and the corresponding labels.
- att_splits contains the different splits for trainval, train, val and test set.

In [None]:
def get_mat():
    res101 = scipy.io.loadmat(FEATURES_PATH)
    att_splits = scipy.io.loadmat(ATTRIBUTES_PATH)
    return res101, att_splits

We need the corresponding ground-truth labels/classes for each training example for all our train, val, trainval and test set according to the split locations provided.

In [None]:
def map_labels(source, dest):
    for label in source:
        dest[dest == label] = label
    return dest


def get_labels(res101, att_splits):
    labels = dict()
    labels['all'] = res101['labels'] - 1

    labels['train'] = labels['all'][np.squeeze(att_splits[TRAIN_COL] - 1)]
    labels['val'] = labels['all'][np.squeeze(att_splits[VAL_COL] - 1)]
    labels['trainval'] = labels['all'][np.squeeze(att_splits[TRAINVAL_COL] - 1)]
    labels['test'] = labels['all'][np.squeeze(att_splits[TEST_COL] - 1)]
    labels['testval'] = labels['all'][np.squeeze(att_splits[TEST_SEEN_COL] - 1)]

    labels['train_seen'] = np.unique(labels['train'])
    labels['val_unseen'] = np.unique(labels['val'])
    labels['trainval_seen'] = np.unique(labels['trainval'])
    labels['test_unseen'] = np.unique(labels['test'])
    labels['test_seen'] = np.unique(labels['testval'])

    labels['train'] = map_labels(labels['train_seen'], labels['train'])
    labels['val'] = map_labels(labels['val_unseen'], labels['val'])
    labels['trainval'] = map_labels(labels['trainval_seen'], labels['trainval'])
    labels['test'] = map_labels(labels['test_unseen'], labels['test'])
    labels['testval'] = map_labels(labels['test_seen'], labels['testval'])
    return labels

Let us denote the features X ∈ [d×m] available at training stage,
where d is the dimensionality of the data, and m is the number of instances.

In [None]:
def get_features(res101, att_splits):
    features = dict()
    scaler = preprocessing.MinMaxScaler()
    features['all'] = res101['features'].transpose()

    features['trainval'] = features['all'][np.squeeze(att_splits[TRAINVAL_COL] - 1), :]
    features['trainval'] = scaler.fit_transform(features['trainval'])
    features['train'] = features['all'][np.squeeze(att_splits[TRAIN_COL] - 1), :]
    features['train'] = scaler.transform(features['train'])
    features['val'] = features['all'][np.squeeze(att_splits[VAL_COL] - 1), :]
    features['val'] = scaler.transform(features['val'])
    features['test'] = features['all'][np.squeeze(att_splits[TEST_COL] - 1), :]
    features['test'] = scaler.transform(features['test'])
    features['testval'] = features['all'][np.squeeze(att_splits[TEST_SEEN_COL] - 1), :]
    features['testval'] = scaler.transform(features['testval'])
    features['all'] = scaler.transform(features['all'])
    return features

Each of the classes in the dataset have an attribute (a) description.
This vector is known as the `Signature matrix` of dimension S ∈ [0, 1]a×z.
For training stage there are z classes and z' classes for test S ∈ [0, 1]a×z'.
The occurance of an attribute corresponding to the class is given.
For instance, if the classes are `horse` and `zebra` and the corresponding attributes are
[wild_animal, 4_legged, carnivore]
```
 Horse      Zebra
[0.00354613 0.        ] Domestic_animal
[0.13829921 0.20209503] 4_legged
[0.06560347 0.04155225] carnivore
```
att_splits keys: 'allclasses_names', 'att', 'original_att', 'test_seen_loc', 'test_unseen_loc', 'train_loc', 'trainval_loc', 'val_loc'

In [None]:
def get_signatures(att_splits):
    attrs = att_splits['att'].transpose()
    signatures = list()
    for i, attr in enumerate(attrs):
        signatures.append((i, attr))
    classnames, signatures = zip(*signatures)
    classnames = list(classnames)
    signatures = np.asarray(signatures, dtype=np.float)
    return signatures, classnames


def get_attributes(labels, signatures):
    attributes = dict()
    attributes['all'] = np.array([signatures[y] for y in labels['all']]).squeeze()

    attributes['train'] = np.array([signatures[y] for y in labels['train']]).squeeze()
    attributes['val'] = np.array([signatures[y] for y in labels['val']]).squeeze()
    attributes['trainval'] = np.array([signatures[y] for y in labels['trainval']]).squeeze()
    attributes['test'] = np.array([signatures[y] for y in labels['test']]).squeeze()
    attributes['testval'] = np.array([signatures[y] for y in labels['testval']]).squeeze()
    return attributes


def load_data():
    res101, att_splits = get_mat()
    labels = get_labels(res101, att_splits)
    features = get_features(res101, att_splits)
    signatures, classnames = get_signatures(att_splits)
    attributes = get_attributes(labels, signatures)
    return features, attributes, labels, signatures, classnames

### Data loading

In [None]:
features, attributes, labels, signatures, classnames = load_data()
train_seenX, train_seenA, train_seenY = features['trainval'], attributes['trainval'], labels['trainval']
test_unseenX, test_unseenA, test_unseenY = features['test'], attributes['test'], labels['test']
test_seenX, test_seenA, test_seenY = features['testval'], attributes['testval'], labels['testval']

# Embedded method

### Embedded classifier definition

In [None]:
CLASS_VECTORS = list()


def custom_kernel_init(shape, dtype=None):
    return CLASS_VECTORS.T


class EmbeddedClassifier:
    def __init__(self, signatures, input_shape=INPUT_SHAPE, learning_rate=LEARNING_RATE_CLS, beta=BETA):
        self.model = self.__build_model(signatures, input_shape, learning_rate, beta)

    def __build_model(self, signatures, input_shape, learning_rate, beta):
        global CLASS_VECTORS
        CLASS_VECTORS = signatures
        model = Sequential()
        model.add(Dense(signatures.shape[1], input_shape=(input_shape,), activation='relu', kernel_initializer=RandomNormal(stddev=0.02)))
        model.add(Dense(signatures.shape[0], activation='softmax', trainable=False, kernel_initializer=custom_kernel_init))
        print('-----------------------')
        print('Classifier')
        model.summary()
        adam = Adam(lr=learning_rate, beta_1=beta)
        model.compile(loss=SparseCategoricalCrossentropy(from_logits=True), optimizer=adam, metrics=['accuracy'])
        return model

    def get_model(self):
        return self.model

    def train(self, trainX, trainY, n_batch=N_BATCH_CLS, n_epochs=N_EPOCHS_CLS):
        self.model.fit(trainX, trainY, verbose=2, epochs=n_epochs, batch_size=n_batch, shuffle=True)

    def eval(self, x, y, classnames, signatures):
        print()
        #score = self.model.evaluate(x, y, verbose=0)
        #print('Test loss:', score[0])
        #print('Test accuracy:', score[1])
        inp = self.model.input
        out = self.model.layers[-2].output
        model = Model(inp, out)
        predY = model.predict(x)
        tree = KDTree(signatures)
        top5, top3, top1 = 0, 0, 0
        for i, pred in enumerate(predY):
            pred = np.expand_dims(pred, axis=0)
            dist_5, index_5 = tree.query(pred, k=5)
            # TODO fix index_5 !!!
            pred_labels = [classnames[index] for index in index_5[0]]
            true_label = y[i]
            if true_label in pred_labels:
                top5 += 1
            if true_label in pred_labels[:3]:
                top3 += 1
            if true_label == pred_labels[0]:
                top1 += 1
        print("ZERO SHOT LEARNING SCORE")
        print("-> Top-5 Accuracy: %.2f" % (top5 / float(len(x))))
        print("-> Top-3 Accuracy: %.2f" % (top3 / float(len(x))))
        print("-> Top-1 Accuracy: %.2f" % (top1 / float(len(x))))

### Embedded method

In [None]:
cls = EmbeddedClassifier(signatures)
cls.train(train_seenX, train_seenY)
signatures_eval = signatures if GZSL else signatures[labels['test_unseen'], :]
cls.eval(test_unseenX, test_unseenY, classnames, signatures_eval)
cls.eval(test_seenX, test_seenY, classnames, signatures_eval)

# Generative method

### Classifier definition

In [None]:
class Classifier:
    def __init__(self, input_shape=INPUT_SHAPE, learning_rate=LEARNING_RATE_CLS, beta=BETA):
        self.model = self.__build_model(input_shape, learning_rate, beta)

    def __build_model(self, input_shape, learning_rate, beta):
        model = Sequential()
        model.add(Dense(N_CLASSES, input_shape=(input_shape,), activation='softmax', kernel_initializer=RandomNormal(stddev=0.02)))
        print('-----------------------')
        print('Classifier')
        model.summary()
        feature = Input(shape=(input_shape,))
        classes = model(feature)
        model = Model(feature, classes)
        opt = Adam(learning_rate=learning_rate, beta_1=beta, beta_2=0.999)
        loss = SparseCategoricalCrossentropy(from_logits=True)
        model.compile(loss=loss, optimizer=opt, metrics=['accuracy'])
        return model

    def get_model(self):
        return self.model

    def train(self, x, y, n_batch=N_BATCH_CLS, n_epochs=N_EPOCHS_CLS):
        self.model.fit(x, y, batch_size=n_batch, epochs=n_epochs, shuffle=True, verbose=1)

    def train_epoch(self, x, y, n_batch=N_BATCH_CLS):
        idx = np.random.permutation(len(x))
        x, y = x[idx], y[idx]
        n_batches = int(len(x) / n_batch)
        for batch in range(n_batches):
            batchX = x[batch * n_batch : (batch + 1) * n_batch]
            batchY = y[batch * n_batch : (batch + 1) * n_batch]
            self.model.train_on_batch(batchX, batchY)

    def eval(self, x, y):
        score = self.model.evaluate(x, y, verbose=0)
        print('Test loss:', score[0])
        print('Test accuracy:', score[1])

    def get_per_class_accuracy(self, test_X, test_Y, target_classes):
        test_Y = test_Y.squeeze()
        predicted_label = self.model.predict(test_X).argmax(axis=-1)
        acc_per_class = 0
        for i in target_classes:
            idx = (test_Y == i)
            acc_per_class += np.sum(test_Y[idx] == predicted_label[idx]) / np.sum(idx)
        acc_per_class /= target_classes.shape[0]
        return acc_per_class

### GAN definition

In [None]:
import tensorflow as tf
from tensorflow.keras import backend
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, ReLU, LeakyReLU, Concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from functools import partial
import numpy as np
from losses import wasserstein_loss, gradient_penalty_loss
from config import *


class RandomWeightedAverage(tf.keras.layers.Layer):
    def call(self, inputs, **kwargs):
        alpha = backend.random_uniform((inputs[2], 1))
        return (alpha * inputs[0]) + ((1 - alpha) * inputs[1])


class Gan:
    def __init__(self, classifier, n_attributes=N_ATTRIBUTES, input_shape=INPUT_SHAPE, n_batch=N_BATCH_GAN, latent_dim=LATENT_DIM):
        self.critic = self.__build_critic(n_attributes, input_shape)
        self.generator = self.__build_generator(n_attributes, latent_dim)
        self.classifier = classifier
        self.compiled_critic = self.__compile_critic(n_batch)
        self.compiled_generator = self.__compile_generator()

    def __build_critic(self, n_attributes, input_shape):
        model = Sequential()
        model.add(Dense(4096, input_dim=(n_attributes + input_shape), kernel_initializer=RandomNormal(stddev=0.02)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(1, kernel_initializer=RandomNormal(stddev=0.02)))
        print('-----------------------')
        print('Critic')
        model.summary()
        feature = Input(shape=(input_shape,))
        label = Input(shape=(n_attributes,))
        model_input = Concatenate()([feature, label])
        validity = model(model_input)
        return Model([feature, label], validity)

    def __build_generator(self, n_attributes, latent_dim):
        model = Sequential()
        model.add(Dense(4096, input_dim=(n_attributes + latent_dim), kernel_initializer=RandomNormal(stddev=0.02)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(INPUT_SHAPE, kernel_initializer=RandomNormal(stddev=0.02)))
        model.add(ReLU())
        print('-----------------------')
        print('Generator')
        model.summary()
        noise = Input(shape=(latent_dim,))
        label = Input(shape=(n_attributes,))
        model_input = Concatenate()([noise, label])
        img = model(model_input)
        return Model([noise, label], img)

    def __compile_critic(self, n_batch):
        # Freeze generator's layers while training critic
        self.critic.trainable = True
        self.generator.trainable = False
        # features input (real sample)
        real_features = Input(shape=INPUT_SHAPE)
        # Noise input
        z_disc = Input(shape=(LATENT_DIM,))
        # Generate features based of noise (fake sample) and add label to the input
        label = Input(shape=(N_ATTRIBUTES,))
        fake_features = self.generator([z_disc, label])
        # Discriminator determines validity of the real and fake images
        fake = self.critic([fake_features, label])
        valid = self.critic([real_features, label])
        # Construct weighted average between real and fake images
        interpolated_features = RandomWeightedAverage()(inputs=[real_features, fake_features, n_batch])
        # Determine validity of weighted sample
        validity_interpolated = self.critic([interpolated_features, label])
        # Use Python partial to provide loss function with additional 'averaged_samples' argument
        partial_gp_loss = partial(gradient_penalty_loss,
                                  averaged_samples=interpolated_features)
        partial_gp_loss.__name__ = 'gradient_penalty'  # Keras requires function names

        c_model = Model(inputs=[real_features, label, z_disc], outputs=[valid, fake, validity_interpolated])
        opt = Adam(lr=LEARNING_RATE, beta_1=BETA)

        # c_model.compile(loss=['binary_crossentropy', 'binary_crossentropy', partial_gp_loss],
        c_model.compile(loss=[wasserstein_loss, wasserstein_loss, partial_gp_loss],
                        optimizer=opt, loss_weights=[1, 1, GRADIENT_PENALTY_WEIGHT],
                        experimental_run_tf_function=False)
        return c_model

    def __compile_generator(self):
        # For the generator we freeze the critic's layers + classification Layers
        self.critic.trainable = False
        self.classifier.trainable = False
        self.generator.trainable = True

        # Sampled noise for input to generator
        z_gen = Input(shape=(LATENT_DIM,))
        # add label to the input
        label = Input(shape=(N_ATTRIBUTES,))
        # Generate images based of noise
        features = self.generator([z_gen, label])
        # Discriminator determines validity
        valid = self.critic([features, label])
        # Discriminator determines class
        classx = self.classifier(features)

        g_model = Model([z_gen, label], [valid, classx])
        opt = Adam(lr=LEARNING_RATE, beta_1=BETA)
        g_model.compile(loss=[wasserstein_loss, SparseCategoricalCrossentropy(from_logits=True)],
                        optimizer=opt, loss_weights=[1, CLS_LOSS_WEIGHT],
                        experimental_run_tf_function=False)
        return g_model

    def train(self, trainX, trainA, trainY, n_batch=N_BATCH_GAN, n_epochs=N_EPOCHS_GAN, load=LOAD,
              n_critic=N_CRITIC, latent_dim=LATENT_DIM, save_every=SAVE_MODEL_EVERY, save_path=MODEL_SAVE_PATH):
        if load:
            self.generator.load_weights(save_path)
            return
        # Adversarial ground truths
        valid = -np.ones((n_batch, 1))
        fake = np.ones((n_batch, 1))
        dummy = np.zeros((n_batch, 1))  # Dummy gt for gradient penalty
        for epoch in range(n_epochs):
            for i in range(0, trainX.shape[0], n_batch):
                for _ in range(n_critic):
                    # Select a random batch of images
                    idx = np.random.permutation(trainX.shape[0])[0:n_batch]
                    features, labels, attr = trainX[idx], trainY[idx], trainA[idx]
                    # Sample generator input
                    noise = np.random.normal(0, 1, (n_batch, latent_dim))
                    # Train the critic
                    d_loss = self.compiled_critic.train_on_batch([features, attr, noise], [valid, fake, dummy])
                noise = np.random.normal(0, 1, (n_batch, latent_dim))
                g_loss = self.compiled_generator.train_on_batch([noise, attr], [valid, labels])
            wass = -np.mean(d_loss[1] + d_loss[2])
            print("%d [D loss: %f] [G loss: %f] [wass: %f] [real: %f] [fake: %f]" % (
            epoch, np.mean(d_loss[0]), np.mean(g_loss[0]), wass, np.mean(d_loss[1]), np.mean(d_loss[2])))
            if epoch % save_every == 0:
                #self.compiled_generator.save_weights(save_path)
                self.generator.save_weights(save_path)

    def generate_synth_dataset(self, signatures, labels, n_samples=N_SAMPLES_GAN, latent_dim=LATENT_DIM):
        z_input = np.random.normal(0, 1, (n_samples, latent_dim))
        fakeY = np.random.randint(0, len(labels), n_samples)
        fakeY = labels[fakeY]
        fakeA = signatures[fakeY]
        fakeX = self.generator.predict([z_input, fakeA])
        return fakeX, fakeA, fakeY

### Generative method

In [None]:
# create and train classifier for GAN
precls = Classifier()
precls.train(train_seenX, train_seenY)
precls.eval(test_unseenX, test_unseenY)
precls.eval(test_seenX, test_seenY)

# create and train GAN, create synthetic dataset
gan = Gan(precls.get_model())
gan.train(train_seenX, train_seenA, train_seenY)
fake_trainX, fake_trainA, fake_trainY = gan.generate_synth_dataset(signatures, labels['test_unseen'])
if GZSL:
    fake_trainX = np.concatenate((fake_trainX, train_seenX))
    fake_trainA = np.concatenate((fake_trainA, train_seenA))
    fake_trainY = np.concatenate((fake_trainY, train_seenY.squeeze()))

# create, train, and test final classifier
# acc_seen=0.5014, acc_unseen=0.3985, h=0.4441, epoch=10
postcls = Classifier()
best_acc_seen, best_acc_unseen, best_H, best_epoch = 0, 0, 0, 0
for epoch in range(N_EPOCHS_CLS):
    postcls.train_epoch(fake_trainX, fake_trainY)
    acc_seen = postcls.get_per_class_accuracy(test_seenX, test_seenY, labels['test_seen'])
    acc_unseen = postcls.get_per_class_accuracy(test_unseenX, test_unseenY, labels['test_unseen'])
    H = 2 * acc_seen * acc_unseen / (acc_seen + acc_unseen)
    if H > best_H:
        best_acc_seen, best_acc_unseen, best_H, best_epoch = acc_seen, acc_unseen, H, epoch
print('acc_seen=%.4f, acc_unseen=%.4f, h=%.4f, epoch=%d' % (best_acc_seen, best_acc_unseen, best_H, best_epoch))