In [None]:
import numpy as np 
import matplotlib.pyplot as plt
from keras.datasets import mnist
# Used for Confusion Matrix
from sklearn import metrics
# Used for Loading MNIST
from struct import unpack
# Used for calculating result and accuracy
from sklearn.metrics import f1_score
from sklearn.metrics import plot_confusion_matrix
# Used for scaling image data
from sklearn import preprocessing
 
from sklearn.model_selection import train_test_split
 
import seaborn as sns
from sklearn.metrics import confusion_matrix
 
%tensorflow_version 2.x
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
 
%matplotlib inline

In [None]:
def get_data(input_shape):
    num_classes = 10
    (train_img, train_lbl), (test_img, test_lbl) = mnist.load_data()
    #p=np.random.permutation(train_img.shape[0])
    #train_img = train_img[p]
    #train_lbl = train_lbl[p]
    #p=np.random.permutation(test_img.shape[0])
    #test_img = test_img[p]
    #test_lbl = test_lbl[p]
    train_img = train_img.astype("float32") / 255
    test_img = test_img.astype("float32") / 255
    # Make sure images have shape (28, 28, 1)
    train_img = np.expand_dims(train_img, -1)
    test_img = np.expand_dims(test_img, -1)
    return train_img, train_lbl, test_img, test_lbl

In [None]:
def define_cnn_model(input_shape):
        model = keras.Sequential(
            [
                keras.Input(shape=input_shape),
                layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
                layers.MaxPooling2D(pool_size=(2, 2)),
                layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
                layers.MaxPooling2D(pool_size=(2, 2)),
                layers.Flatten(),
                layers.Dropout(0.5),
                layers.Dense(10, activation="softmax"),
            ]
        )
 
        model.compile(
            optimizer=keras.optimizers.Adam(0.001),
            loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics=[keras.metrics.SparseCategoricalAccuracy()]
        )
        return model

In [None]:
def plot_result(y_test, y_test_pred, test_f1s, test_f1s_avg, pseudo_labels, epoch):
    fig, (ax1, ax2, ax3) = plt.subplots(nrows=3, ncols=1, figsize=(25,25))
    ax1.plot(range(epoch), test_f1s)
    ax1.set_ylabel('f1 Score')
    ax1.set_xlabel('# Iterations')
    ax1.legend(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'], loc="lower right")
    ax2.plot(range(epoch), test_f1s_avg, )
    ax2.set_ylabel('Average f1 Score')
    ax2.set_xlabel('# Iterations')
    ax2.legend("Average", loc="lower right")
    ax3.bar(x=range(epoch), height=pseudo_labels)
    ax3.set_ylabel('Pseudo-Labels Created')
    ax3.set_xlabel('# Iterations')
 
    # View confusion matrix after self-training
    cf_matrix = confusion_matrix(y_test, y_test_pred, normalize='true')
    fig, ax = plt.subplots(figsize=(10, 10))
    sns.heatmap(cf_matrix, annot=True, 
                fmt='.2f', cmap='Blues')

In [None]:
import sys
import time


class Stopwatch(object):
    """
    A simple cross-platform
    context-manager stopwatch.
    Examples
    --------
    >>> import time
    >>> with Stopwatch(verbose=True) as s:
    ...     time.sleep(0.1) # doctest: +ELLIPSIS
    Elapsed time: 0.10... sec
    >>> with Stopwatch(verbose=False) as s:
    ...     time.sleep(0.1)
    >>> import math
    >>> math.fabs(s.elapsed() - 0.1) < 0.05
    True
    """
    def __init__(self, verbose=False):
        self.verbose = verbose
        if sys.platform == 'win32':
            # on Windows, the best timer is time.clock()
            self._timer_func = time.clock
        else:
            # on most other platforms, the best timer is time.time()
            self._timer_func = time.time
        self.reset()

    def __enter__(self, verbose=False):
        return self.start()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()
        return self.elapsed()

    def start(self):
        if not self._is_running:
            self._start = self._timer_func()
            self._is_running = True
        return self

    def stop(self):
        if self._is_running:
            self._total += (self._timer_func() - self._start)
            self._is_running = False
        return self

    def elapsed(self):
        if self._is_running:
            now = self._timer_func()
            self._total += (now - self._start)
            self._start = now
        if self.verbose:
            print("Elapsed time: {0:.3f} sec".format(self._total))
        return self._total

    def reset(self):
        self._start = 0.
        self._total = 0.
        self._is_running = False
        return self

In [None]:
def self_train_with_pretrain(model_type: str, batchsize: int, epochs: int, num_pseudo: int):
    input_shape=(28, 28, 1)
    X_train, y_train, X_test, y_test = get_data(input_shape=input_shape)
    train_f1s = []
    test_f1s = []
    train_f1s_avg = []
    test_f1s_avg = []
    pseudo_labels = []
    high_prob_counter = 1
    total_pseudo_labelled = 0
    total_prelabelled_added = 0
    
    #X_train_s = np.empty((0,28,28,1))
    #y_train_s = np.empty(0)
 
    X_train_s = []
    y_train_s = []
    count = 0
    for i in range(10):
        for j in range(60000):
            if y_train[j]==i:
                count+=1
                y_train_s.append(y_train[j])
                X_train_s.append(X_train[j])
                if count > ((num_pseudo/10)-1):
                    count = 0
                    break
    X_train_s = np.array(X_train_s)
    y_train_s = np.array(y_train_s)
 
    X_plabelled = np.empty((0,28,28,1))
    y_plabelled = np.empty(0)
    unlabelled_missed = np.empty((0,28,28,1))
 
    i = 0
    #pretrain 600 labelled
    if model_type == 'dense':
        model_pretrained = define_dense_model(input_shape)
    elif model_type =='CNN':
        model_pretrained = define_cnn_model(input_shape)
 
    with Stopwatch(verbose=True) as s:
            early_stopping = keras.callbacks.EarlyStopping(monitor='loss', patience=30, verbose=2)
            reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='loss', factor=0.2, verbose=2, patience=15, min_lr=1e-5)
            callbacks = [early_stopping, reduce_lr]
            try:
                model_pretrained.fit(X_train_s, y_train_s,
                        epochs=epochs,
                        batch_size=batch_size,
                        shuffle=False,
                        callbacks=callbacks)
            except KeyboardInterrupt:
                pass

    print(f"{num_pseudo} pre-labelled data is training (pre-trained model is ready)...")
 
    X_train_s = np.empty((0,28,28,1))
    y_train_s = np.empty(0)
 
    total_prelabelled_added += num_pseudo
    i += 1
    epoch = 0
    num_new_labeled = 0
 
    while high_prob_counter > 0:
        high_prob_counter = 0

        epoch += 1
 
        print(f"Iteration: {epoch}")
 
        model = model_pretrained
 
        if X_plabelled.shape[0] > num_pseudo:
 
              X_train_s = np.append(X_train_s, X_plabelled[:num_pseudo], axis=0)
              y_train_s = np.append(y_train_s, y_plabelled[:num_pseudo])
 
              X_plabelled = X_plabelled[num_pseudo:]
              y_plabelled = y_plabelled[num_pseudo:]
 
              num_new_labeled += 1
              print(f"{num_pseudo*num_new_labeled} new labelled data is training...")

              with Stopwatch(verbose=True) as s:
                      early_stopping = keras.callbacks.EarlyStopping(monitor='loss', patience=30, verbose=2)
                      reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='loss', factor=0.2, verbose=2, patience=15, min_lr=1e-5)
                      callbacks = [early_stopping, reduce_lr]
                      try:
                          model.fit(X_train_s, y_train_s,
                                  epochs=epochs,
                                  batch_size=batch_size,
                                  shuffle=False,
                                  callbacks=callbacks)
                      except KeyboardInterrupt:
                          pass
 
        y_test_pred = np.argmax(model.predict(X_test),axis=1)
        y_train_pred = np.argmax(model.predict(X_train),axis=1)
 
        train_f1 = f1_score(y_train, y_train_pred, average=None)
        test_f1 = f1_score(y_test, y_test_pred, average=None)
 
        print(f"Train f1 Score: {train_f1}")
        print(f"Test f1 Score: {test_f1}")
 
        train_f1s.append(train_f1)
        test_f1s.append(test_f1)
 
        
        train_f1_avg = f1_score(y_train, y_train_pred, average='macro')
        test_f1_avg = f1_score(y_test, y_test_pred, average='macro')
 
        print(f"Train f1 Score: {train_f1_avg}")
        print(f"Test f1 Score: {test_f1_avg}")
 
        train_f1s_avg.append(train_f1_avg)
        test_f1s_avg.append(test_f1_avg)

        num_unlabaled = num_pseudo * 1
        if num_unlabaled*(i+1) < 60000:
            unlabelled_splitted = X_train[(num_unlabaled*(i)):(num_unlabaled*(i+1))]
            i += 1
        elif unlabelled_missed.shape[0] > num_unlabaled:
            unlabelled_splitted = unlabelled_missed[:num_unlabaled]
            unlabelled_missed = unlabelled_missed[num_unlabaled:]
        else:
            unlabelled_splitted = unlabelled_missed
            unlabelled_missed = np.empty((0,28,28,1))

        if unlabelled_splitted.shape[0] == 0:
            print("Whole dataset is observed. Nothing remained.")
            break
        unlabeled_pred_prob = model.predict(unlabelled_splitted)
 
        print(f"Now predicting labels for unlabeled data...")
 
        high_prob_idx = []
 
        for j in range(unlabeled_pred_prob.shape[0]):
            max = 0
            pseudo_label = 0
            pred_prob = unlabeled_pred_prob[j]
            for k in range(pred_prob.shape[0]):
                if pred_prob[k] > max:
                    max = pred_prob[k]
                    pseudo_label = k
            if max >= 0.95:
                high_prob_counter += 1
                high_prob_idx.append(j)
                X_plabelled = np.append(X_plabelled, [unlabelled_splitted[j]], axis=0)
                y_plabelled = np.append(y_plabelled, pseudo_label)
            else:
                unlabelled_missed = np.append(unlabelled_missed, [unlabelled_splitted[j]], axis=0)
 
        print(f"{high_prob_counter} high-probability unlabelled predictions is labelled and added to next train dataset.")
 
        print(f"{unlabelled_missed.shape[0]} unlabelled instances remained to be predict in next iteration")
 
        print(f"{X_plabelled.shape[0]} labelled instances are going to be added to train dataset in next iteration.")
 
        
        total_pseudo_labelled += high_prob_counter
        pseudo_labels.append(high_prob_counter)
        
    
    print("*** Model's Learning is finished. ***")
    print(f"In the end, {total_prelabelled_added} pre-labelled images and {total_pseudo_labelled} pseudo-labelled images added to train dataset")
    return y_test, y_test_pred, test_f1s, test_f1s_avg, pseudo_labels, epoch

In [None]:
#HyperParams
batch_size = 128
epochs = 100 
num_pseudo = 600
y_test, y_test_pred, test_f1s, test_f1s_avg, pseudo_labels, epoch = self_train_with_pretrain('CNN', batch_size, epochs, num_pseudo)

In [None]:
#call plot_result() here

In [None]:
#HyperParams
batch_size = 64
epochs = 100 
num_pseudo = 100
y_test, y_test_pred, test_f1s, test_f1s_avg, pseudo_labels, epoch = self_train_with_pretrain('CNN', batch_size, epochs, num_pseudo)

In [None]:
#call plot_result() here

In [None]:
#HyperParams
batch_size = 16
epochs = 100 
num_pseudo = 50
y_test, y_test_pred, test_f1s, test_f1s_avg, pseudo_labels, epoch = self_train_with_pretrain('CNN', batch_size, epochs, num_pseudo)

In [None]:
#call plot_result() here

In [None]:
#HyperParams
batch_size = 16
epochs = 100 
num_pseudo = 30
y_test, y_test_pred, test_f1s, test_f1s_avg, pseudo_labels, epoch = self_train_with_pretrain('CNN', batch_size, epochs, num_pseudo)

In [None]:
#call plot_result() here