# Using CNN to extract features. This way we use train_y during the training process thus it is a semi-supervised method.

In [1]:
import tensorflow as tf
import numpy as np
import math
import scipy.io as sio

In [2]:
data  = sio.loadmat('../distorted_MNIST.mat')

In [3]:
train_X = data['train_X']
train_y = data['train_y']
test_X = data['test_X']
test_data = sio.loadmat('../testset_label.mat')
test_y = test_data['test_y']
train_y = train_y.flatten()
test_y = test_y.flatten()

In [4]:
test_y.shape

(40000,)

In [5]:
train_X = train_X.reshape(-1,28,28,1)
test_X = test_X.reshape(-1, 28, 28, 1)

In [6]:
train_X.shape

(10000, 28, 28, 1)

In [7]:
NUM_HIDDEN = 512
input_shape = [None, 28, 28, 1]
x = tf.placeholder(tf.float32, input_shape, name='x')
y_ = tf.placeholder(tf.int64, shape=[None], name='y')

#helper function
def leaky_relu(x, alpha=0.1, dtype=tf.float32):
    x = tf.cast(x, dtype=dtype)
    bool_mask = (x > 0)
    mask = tf.cast(bool_mask, dtype=dtype)
    return 1.0 * mask * x + alpha * (1 - mask) * x

def add_3x3_conv_layer(inputs, num_filters, stride=[1,1,1,1], name='conv'):
    # input should be a 4D tensor.
    input_fm = inputs.get_shape().as_list()[3]
    W = tf.Variable(
            tf.truncated_normal([3, 3, input_fm, num_filters],
            stddev=1.0 / math.sqrt(input_fm)), name=name+'/W')
    b=tf.Variable(tf.zeros([num_filters])+0.1, name=name+'/b')
    output = leaky_relu(tf.add(tf.nn.conv2d(inputs, W, stride, padding='SAME'), b))
    return output

        

In [8]:
# define the network
conv1_1 = add_3x3_conv_layer(x,16,name='conv1_1')
conv1_2 = add_3x3_conv_layer(conv1_1, 16, name='conv1_2')
conv2_1 = add_3x3_conv_layer(conv1_2, 32, [1,2,2,1], 'conv2_1')
conv2_2 = add_3x3_conv_layer(conv2_1, 32, name='conv2_2')
conv3_1 = add_3x3_conv_layer(conv2_2, 64, [1,2,2,1], 'conv3_1')
conv3_2 = add_3x3_conv_layer(conv3_1, 64, name='conv3_2')
conv3_2_flat = tf.reshape(conv3_2, [-1, 7*7*64])
W_fc1 = tf.Variable(
                tf.truncated_normal([7*7*64, NUM_HIDDEN],
                stddev=1.0 / math.sqrt(7*7*64)), name='W_fc1')
b_fc1 = tf.Variable(tf.zeros([NUM_HIDDEN])+0.1, name='b_fc1')
h_fc1 = leaky_relu(tf.matmul(conv3_2_flat, W_fc1) + b_fc1)
keep_prob = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

W_fc2 = tf.Variable(
                tf.truncated_normal([NUM_HIDDEN, 10],
                stddev=1.0 / math.sqrt(NUM_HIDDEN)), name='W_fc2')
b_fc2 = tf.Variable(tf.zeros([10]), name='b_fc2')

y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2

# cross entropy loss
cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(y_conv, y_))


In [9]:
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(y_conv,1), y_)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

In [11]:
batch_size = 200
n_epochs = 100
for epoch_i in range(n_epochs):
    for batch_i in range(10000 // batch_size):
        batch_x = train_X[batch_i*batch_size:(batch_i+1)*batch_size,:,:,:]
        batch_y = train_y[batch_i*batch_size:(batch_i+1)*batch_size]
        train_step.run(feed_dict={x: batch_x, y_:batch_y, keep_prob:0.5})
    train_accuracy = accuracy.eval(feed_dict={
            x:batch_x, y_: batch_y, keep_prob: 1.0})
    print("epoch %d, training accuracy %g"%(epoch_i, train_accuracy))

epoch 0, training accuracy 1
epoch 1, training accuracy 1
epoch 2, training accuracy 1
epoch 3, training accuracy 1
epoch 4, training accuracy 1
epoch 5, training accuracy 1
epoch 6, training accuracy 1
epoch 7, training accuracy 1
epoch 8, training accuracy 1
epoch 9, training accuracy 1
epoch 10, training accuracy 1
epoch 11, training accuracy 1
epoch 12, training accuracy 1
epoch 13, training accuracy 1
epoch 14, training accuracy 1
epoch 15, training accuracy 1
epoch 16, training accuracy 1
epoch 17, training accuracy 1
epoch 18, training accuracy 1
epoch 19, training accuracy 1
epoch 20, training accuracy 1
epoch 21, training accuracy 1
epoch 22, training accuracy 1
epoch 23, training accuracy 1
epoch 24, training accuracy 1
epoch 25, training accuracy 1
epoch 26, training accuracy 1
epoch 27, training accuracy 1
epoch 28, training accuracy 1
epoch 29, training accuracy 1
epoch 30, training accuracy 1
epoch 31, training accuracy 1
epoch 32, training accuracy 1
epoch 33, training a

In [12]:
X = np.concatenate((train_X, test_X),0)

In [13]:
X.shape

(50000, 28, 28, 1)

In [14]:
X_encode = np.zeros((50000,NUM_HIDDEN), dtype=np.float32)
for batch_i in range(50000 // batch_size):
    batch_xs = X[batch_i*batch_size:(batch_i+1)*batch_size,:,:,:]
    encoded_feature = sess.run(h_fc1_drop, feed_dict={x: batch_xs, keep_prob:1})
    X_encode[batch_i*batch_size:(batch_i+1)*batch_size,:] = encoded_feature

In [15]:
X_encode.shape

(50000, 512)

In [16]:
import sklearn.neighbors
from sklearn.decomposition import PCA
KNN_C = sklearn.neighbors.KNeighborsClassifier()

def train_and_eval( model, train_x, train_y, test_x, test_y):
    model.fit( train_x, train_y )
    p = model.predict( test_x )
    OA = sum(test_y==p)/len(test_y)
    return OA

# test accuracy

In [17]:
train_and_eval(KNN_C, X_encode[:10000,:], train_y, X_encode[10000:,:], test_y)

0.85809999999999997

In [18]:
sio.savemat('X_encode_CNN_512.mat', {'X_encode': X_encode})

# use principle components of the latent space improves accuracy

In [21]:
train_index = np.arange(10000)
test_index = np.arange(10000,50000)
pca = PCA(n_components=50)
pcomp = pca.fit_transform(X_encode)
X_pcomp_train,X_pcomp_test = [pcomp[train_index],pcomp[test_index]]
train_and_eval(KNN_C, X_pcomp_train, train_y, X_pcomp_test, test_y)

0.872475