In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
import os
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  try:
    # Currently, memory growth needs to be the same across GPUs
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.experimental.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Memory growth must be set before GPUs have been initialized
    print(e)


In [None]:
learning_rate = 0.0001
meta_step_size = 0.5 #0.25

inner_batch_size = 8
eval_batch_size = 8

meta_iters = 1000 # 2000
eval_iters = 5 # repetitions
inner_iters = 2 # repetitions

eval_interval = 1 #meta_iter % eval_interval == 0이면 eval. 1이면 매번 한다는 뜻.
train_shots = 431
f_shots = 48 # eval
classes = 4
split = 5

In [None]:
import cv2
class Dataset:
    # This class will facilitate the creation of a few-shot dataset
    # from the Omniglot dataset that can be sampled from quickly while also
    # allowing to create new labels at the same time.
    def __init__(self, mode):
        # Download the tfrecord files containing the omniglot data and convert to a
        # dataset.
        self.mode = mode
        self.labels = range(11)

                
    def read_img(self, real_label, shots):
        ds = []
        path = os.path.join(os.getcwd(), 'image8', self.mode, str(real_label))
        files = os.listdir(path)
        sp = random.choices(files, k=shots)
        for s in sp:
            image = cv2.imread(path + "/" + s, cv2.IMREAD_COLOR)  # uint8 image
            #norm_image = cv2.normalize(image, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
            image = np.asarray(image).astype('float32')
            image /= 255.0
            ds.append(image)
        return ds
    
    def get_mini_dataset(
        self, batch_size, repetitions, shots, num_classes, split=False
    ):
        temp_labels = np.zeros(shape=(num_classes * shots))
        temp_images = np.zeros(shape=(num_classes * shots, 299, 299, 3))
        if split:
            test_labels = np.zeros(shape=(num_classes * split))
            test_images = np.zeros(shape=(num_classes * split, 299, 299, 3))

        # Get a random subset of labels from the entire label set.
        label_subset = random.sample(self.labels, k=num_classes)
        for class_idx, class_obj in enumerate(label_subset):
            # Use enumerated index value as a temporary label for mini-batch in
            # few shot learning.
            temp_labels[class_idx * shots : (class_idx + 1) * shots] = class_idx
            # If creating a split dataset for testing, select an extra sample from each
            # label to create the test dataset.
            if split:
                test_labels[class_idx * split : (class_idx + 1) * split] = class_idx
                test_images[
                    class_idx * split : (class_idx + 1) * split
                ] = self.read_img(label_subset[class_idx], split)
                temp_images[
                    class_idx * shots : (class_idx + 1) * shots
                ] = self.read_img(label_subset[class_idx], shots)
            else:
                # For each index in the randomly selected label_subset, sample the
                # necessary number of images.
                temp_images[
                    class_idx * shots : (class_idx + 1) * shots
                ] = self.read_img(label_subset[class_idx], shots)

        dataset = tf.data.Dataset.from_tensor_slices(
            (temp_images.astype(np.float32), temp_labels.astype(np.int32))
        )
        dataset = dataset.shuffle(shots).batch(batch_size).repeat(repetitions)
        if split:
            return dataset, test_images.astype(np.float32), test_labels.astype(np.int32)
        return dataset


import urllib3

urllib3.disable_warnings()  # Disable SSL warnings that may happen during download.
train_dataset = Dataset(mode='train')
test_dataset = Dataset(mode='val')

In [None]:
img_rows, img_cols, img_channel = 331, 331, 3

base_model = tf.keras.applications.NASNetLarge(weights='imagenet', include_top=False, input_shape=(img_rows, img_cols, img_channel))

In [None]:
add_model = tf.keras.Sequential()
add_model.add(tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:]))
add_model.add(tf.keras.layers.Dense(units=classes, activation=tf.nn.softmax))

model = tf.keras.Model(inputs=base_model.input, outputs=add_model(base_model.output))
optimizer = tf.keras.optimizers.Adam(lr=learning_rate)
model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
#model.summary()

In [None]:
training = []
testing = []
for meta_iter in range(meta_iters):
    frac_done = meta_iter / meta_iters
    cur_meta_step_size = (1 - frac_done) * meta_step_size
    # Temporarily save the weights from the model.
    old_vars = model.get_weights()
    # Get a sample from the full dataset.
    mini_dataset = train_dataset.get_mini_dataset(
        inner_batch_size, inner_iters, train_shots, classes
    )
    for images, labels in mini_dataset:
        with tf.GradientTape() as tape:
            preds = model(images)
            loss = keras.losses.sparse_categorical_crossentropy(labels, preds)
        grads = tape.gradient(loss, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
    new_vars = model.get_weights()
    # Perform SGD for the meta step.
    for var in range(len(new_vars)):
        new_vars[var] = old_vars[var] + (
            (new_vars[var] - old_vars[var]) * cur_meta_step_size
        )
    # After the meta-learning step, reload the newly-trained weights into the model.
    model.set_weights(new_vars)
    # Evaluation loop
    if meta_iter % eval_interval == 0:
        accuracies = []
        for dataset in (train_dataset, test_dataset):
            # Sample a mini dataset from the full dataset.
            train_set, test_images, test_labels = dataset.get_mini_dataset(
                eval_batch_size, eval_iters, f_shots, classes, split=split
            )
            old_vars = model.get_weights()
            # Train on the samples and get the resulting accuracies.
            for images, labels in train_set:
                with tf.GradientTape() as tape:
                    preds = model(images)
                    loss = keras.losses.sparse_categorical_crossentropy(labels, preds)
                grads = tape.gradient(loss, model.trainable_weights)
                optimizer.apply_gradients(zip(grads, model.trainable_weights))
            test_preds = model.predict(test_images)
            print(test_preds)
            test_preds = tf.argmax(test_preds, axis=1).numpy()
            num_correct = (test_preds == test_labels).sum()
            # Reset the weights after getting the evaluation accuracies.
            model.set_weights(old_vars)
            accuracies.append(num_correct / (classes*split))
        training.append(accuracies[0])
        testing.append(accuracies[1])
        #if meta_iter % 100 == 0:
        print(
            "batch %d: train=%f test=%f" % (meta_iter, accuracies[0], accuracies[1])
        )

In [None]:
# First, some preprocessing to smooth the training and testing arrays for display.
window_length = 100
train_s = np.r_[
    training[window_length - 1 : 0 : -1], training, training[-1:-window_length:-1]
]
test_s = np.r_[
    testing[window_length - 1 : 0 : -1], testing, testing[-1:-window_length:-1]
]
w = np.hamming(window_length)
train_y = np.convolve(w / w.sum(), train_s, mode="valid")
test_y = np.convolve(w / w.sum(), test_s, mode="valid")

# Display the training accuracies.
x = np.arange(0, len(test_y), 1)
plt.plot(x, test_y, x, train_y)
plt.legend(["test", "train"])
plt.grid()
plt.show()

In [None]:
train_set, test_images, test_labels = dataset.get_mini_dataset(
    eval_batch_size, eval_iters, train_shots, eval_shots, classes, split=True
)
for images, labels in train_set:
    with tf.GradientTape() as tape:
        preds = model(images)
        loss = keras.losses.sparse_categorical_crossentropy(labels, preds)
    grads = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
test_preds = model.predict(test_images)
test_preds = tf.argmax(test_preds).numpy()
num_correct = (test_preds == test_labels).sum()
print(num_correct / classes)