In [0]:
import numpy as np
import keras
import os
import time
import tensorflow.keras.backend as K
from keras.datasets import cifar10
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint

#File path
#Please set the path to your own path
current = '/content/drive/My Drive/'

# Create CNN model for FashionMNIST datasets
def create_model():
    
    model = Sequential([
    Conv2D(32,(5,5),padding='same',input_shape=(32,32,3),activation='relu'),
    MaxPooling2D(pool_size=(2,2)),
    Conv2D(32,(5,5),activation='relu'),
    AveragePooling2D(pool_size=(2,2)),
    Dropout(0.25),
    
    Conv2D(64,(5,5),padding='same',activation='relu'),
    Conv2D(64,(5,5),activation='relu'),
    Dropout(0.25),
    
    Flatten(),
    Dense(512,activation='relu'),
    Dropout(0.5),
    Dense(3,activation='softmax')])
    print(model.summary())
    
    return model



#Set label to one-hot catogory
def one_hot_embedding(Str, Yts):
    Str = to_categorical(Str, num_classes=3)
    Yts = to_categorical(Yts, num_classes=3)
    return Str, Yts


 # Calculate transition matrix
def calculate_T(y_pred):
    
    T = np.zeros((3, 3))
    index = int(y_pred.shape[0] *0.9)
    
    x_max_0 = y_pred[y_pred[:, 0].argsort()][index]
    x_max_1 = y_pred[y_pred[:, 1].argsort()][index]
    x_max_2 = y_pred[y_pred[:, 2].argsort()][index]

    T[0][0] = 1.0 - x_max_0[1]-x_max_0[2]
    T[0][1] = x_max_0[1]
    T[0][2] = x_max_0[2]

    T[1][0] = x_max_1[0]
    T[1][1] = 1.0 - x_max_1[0] - x_max_1[2]
    T[1][2] = x_max_1[2]

    T[2][0] = x_max_2[0]
    T[2][1] = x_max_2[1]
    T[2][2] = 1.0 - x_max_2[0]-x_max_2[1]
    return T

# Add the transition matrix to our model
def Addtran(T):

        T = K.constant(T)

        def loss(y_true, y_pred):
            y_pred /= K.sum(y_pred, axis=-1, keepdims=True)
            y_pred = K.clip(y_pred, K.epsilon(), 1.0 - K.epsilon())
            return -K.sum(y_true * K.log(K.dot(y_pred, T)), axis=-1)

        return loss


#Standardization, in order to increase the accuracy
def standard(Xtr, Xts):
    

    std = np.std(Xtr, keepdims=True)
    mean = np.mean(Xtr, keepdims=True)
    Xtr = (Xtr - mean) / std
    Xts = (Xts - mean) / std
    return Xtr, Xts


# load the dataset and calculate the transition matrix
def Compute_T():
    # load data
    fpath1 = os.path.join(current,'CIFAR.npz')
    dataset = np.load(fpath1)
    
    Xtr_val = dataset['Xtr']
    Str_val = dataset['Str']
    Xts = dataset['Xts']
    Yts = dataset['Yts']

    #Standardization
    Xtr_val, Xts = standard(Xtr_val, Xts)

    Str_val, Yts = one_hot_embedding(Str_val, Yts)
    
    # Random sample
    a = np.argsort(np.random.random(Str_val.shape[0]))
    Xtr_val = Xtr_val[a]
    Str_val = Str_val[a]

    Total = len(Xtr_val)
    S = int(Total*0.8)

    # Choose 80% of data for trainning, the other 20% for valuating.
    Xtr_train = Xtr_val[:S]
    Str_train = Str_val[:S]
    Xtr_valid = Xtr_val[S:]
    Str_valid = Str_val[S:]

    Xtr_train = Xtr_train.reshape(Xtr_train.shape[0], 32, 32,3)
    Xtr_valid = Xtr_valid.reshape(Xtr_valid.shape[0], 32, 32,3)

    Xts = Xts.reshape(Xts.shape[0], 32, 32,3)

    model = create_model()
    
    fpath = os.path.join(current,'CNNmodel.hdf5')
    model.compile(loss='categorical_crossentropy',
                  optimizer = keras.optimizers.rmsprop(lr=1e-4),
                  metrics=['accuracy'])
    
    # Train the model and compare model in every epoch, save the best model to file
    checkpointer = ModelCheckpoint(filepath=fpath, mode = 'max', monitor='val_acc',verbose = 1, save_best_only=True)

    model.fit(Xtr_train,
              Str_train,
              epochs=50,
              batch_size=1024,
              validation_data=(Xts,Yts),
              callbacks=[checkpointer],
              )
    model.load_weights(fpath)
    time_1 = time.time()
    print('The validation accuracy is :')
    
    # Get accuracy
    accuracy = model.evaluate(Xtr_valid,Str_valid)[1]
    print(accuracy)
    print('The test accuracy is :')
    accuracy1 = model.evaluate(Xts,Yts)[1]
    print(accuracy1)
    time_2 = time.time()
    
    # Use the predict result to compute transition
    result = model.predict(Xtr_train)
    T = calculate_T(result)
    print(T)
    return T,accuracy,accuracy1

In [3]:
# If you want to calculate your own transition matrix, set 'Yes'
# If you want to test transition matrix provided by lecturer, set 'No' and skip
COMPUTE = 'Yes'

if(COMPUTE == 'Yes'):

    Trans = np.zeros((10,3,3))

    valid_accuracy = np.zeros((10))
    test_accuracy = np.zeros((10))
    
    #Run for 10 times, and get mean value
    for i in range(10):
        
        T,accu,accu1 = Compute_T()
        Trans[i] = T
        valid_accuracy[i] = accu
        test_accuracy[i] = accu1
        print('The valid accuracy is :')
        print(valid_accuracy[i])
        
    #Get mean validation accuracy
    print('The valid accuracy is :')
    print(np.mean(valid_accuracy))
    # Get the standard deviation of validation accuracy
    print('The valid std is :')
    print(valid_accuracy.std())
    # Get mean test accuracy
    print('The test accuracy is :')
    print(np.mean(test_accuracy))
    # Get the standard deviation of test accuracy
    print('The test std is :')
    print(test_accuracy.std())
    # Get mean transition matrix
    print(np.mean(Trans,axis=0))
    # Get the standard deviation of test accuracy
    print(np.std(Trans,axis=0))


Epoch 00010: val_acc did not improve from 0.58200
Epoch 11/50

Epoch 00011: val_acc did not improve from 0.58200
Epoch 12/50

Epoch 00012: val_acc improved from 0.58200 to 0.61800, saving model to /content/drive/My Drive/CNNmodel.hdf5
Epoch 13/50

Epoch 00013: val_acc did not improve from 0.61800
Epoch 14/50

KeyboardInterrupt: ignored

In [8]:
valid_accuracy = np.zeros((10))
test_accuracy = np.zeros((10))
print(np.mean(Trans,axis=0))
print(np.std(Trans,axis=0))
T = np.mean(Trans,axis=0)
print(T)

# Run for 10 times, and get mean value
for i in range(10):  

    dataset = np.load(current+'CIFAR.npz')
    Xtr_val = dataset['Xtr']
    Str_val = dataset['Str']
    Xts = dataset['Xts']
    Yts = dataset['Yts']

    Xtr_val, Xts = standard(Xtr_val, Xts)
    Str_val, Yts = one_hot_embedding(Str_val, Yts)
    a = np.argsort(np.random.random(Str_val.shape[0]))
    Xtr_val = Xtr_val[a]
    Str_val = Str_val[a]

    Total = len(Xtr_val)
    S = int(Total*0.8)

    Xtr_train = Xtr_val[:S]
    Str_train = Str_val[:S]
    Xtr_valid = Xtr_val[S:]
    Str_valid = Str_val[S:]
    Xtr_train = Xtr_train.reshape(Xtr_train.shape[0], 32, 32, 3)
    Xtr_valid = Xtr_valid.reshape(Xtr_valid.shape[0], 32, 32, 3)
    Xts = Xts.reshape(Xts.shape[0], 32, 32, 3)
    model = create_model()
    fpath = os.path.join(current,'CNNmodel.hdf5')


    # Train the model and compare model in every epoch, save the best model to file
    T1 = np.array([[0.5,0.2,0.3],[0.3,0.5,0.2],[0.2,0.3,0.5]])
    T2 = np.array([[0.4,0.3,0.3],[0.3,0.4,0.3],[0.3,0.3,0.4]])

    # Add transition matrix to our model
    # If you want to choose transition matrix provided above, change 'T' to 'T1' or 'T2'
    model.compile(loss=Addtran(T),
                  optimizer = keras.optimizers.rmsprop(lr=1e-4),
                  metrics=['accuracy'])   

    checkpointer = ModelCheckpoint(filepath=fpath, mode = 'max', monitor='val_acc',verbose = 1, save_best_only=True)

    model.fit(Xtr_train,
              Str_train,
              epochs=50,
              batch_size=1024,
              validation_data=(Xts,Yts),
              callbacks=[checkpointer],
              )
    time_1 = time.time()

    model.load_weights("CNNmodel.hdf5")
    print('The validation accuracy is :')
    accuracy = model.evaluate(Xtr_valid,Str_valid)[1]
    print(accuracy)
    print('The test accuracy is :')
    accuracy1 = model.evaluate(Xts,Yts)[1]
    print(accuracy1)
    time_2 = time.time()
    valid_accuracy[i] = accuracy
    test_accuracy[i] = accuracy1

#Get mean validation accuracy
print('The valid accuracy is :')
print(np.mean(valid_accuracy))
# Get the standard deviation of validation accuracy
print('The valid std is :')
print(valid_accuracy.std())

#Get mean validation accuracy
print('The test accuracy is :')
print(np.mean(test_accuracy))
# Get the standard deviation of test accuracy
print('The test std is :')
print(test_accuracy.std())


[[0.04019046 0.04061811 0.01919143]
 [0.02954744 0.03966265 0.03078991]
 [0.02309527 0.03614412 0.04076061]]
[[0.12057139 0.12185432 0.05757429]
 [0.08864231 0.11898795 0.09236974]
 [0.0692858  0.10843237 0.12228184]]
[[0.04019046 0.04061811 0.01919143]
 [0.02954744 0.03966265 0.03078991]
 [0.02309527 0.03614412 0.04076061]]
Model: "sequential_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_17 (Conv2D)           (None, 32, 32, 32)        2432      
_________________________________________________________________
max_pooling2d_5 (MaxPooling2 (None, 16, 16, 32)        0         
_________________________________________________________________
conv2d_18 (Conv2D)           (None, 12, 12, 32)        25632     
_________________________________________________________________
average_pooling2d_5 (Average (None, 6, 6, 32)          0         
_________________________________________________________

KeyboardInterrupt: ignored