In [2]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

In [3]:
from sklearn.utils import shuffle

In [4]:
from scipy.io import loadmat
from scipy.signal import convolve2d
import time

In [7]:
def y2indicator(Y):
    N = len(Y)
    res = np.zeros((N,10))
    for i in range(N):
        res[i,Y[i]] = 1
    return res

In [8]:
def error_rate(p,y):
    return np.mean(p!=y)

In [9]:
def my_convpool(X,W,b):
    conv_out = tf.nn.conv2d(X,W,strides=[1,1,1,1],padding='SAME')
    conv_out = tf.nn.bias_add(conv_out,b)
    pool_out = tf.nn.max_pool(conv_out,ksize=[1,2,2,1],strides=[1,2,2,1],padding='SAME')
    relu_out = tf.nn.relu(pool_out)
    return relu_out

In [22]:
def init_filter(shape,poolsz):
    w = np.random.rand(*shape)/np.sqrt(np.prod(shape[:-1])+shape[-1]*np.prod(shape[-2]/np.prod(poolsz)))
    return w.astype(np.float32)

In [14]:
def rearrange(X):
    #the dimension order of input and tf used is different
    N = X.shape[-1]
    res = np.zeros((N,32,32,3),dtype=np.float32)
    for i in range(N):
        for j in range(3):
            res[i,:,:,j] = X[:,:,j,i]
    return res/255 #remember to resize the X

In [41]:
def cnn_tf():
    train_mat = loadmat('C:/Users/Wei Guo/Desktop/data/train_32x32.mat')
    test_mat = loadmat('C:/Users/Wei Guo/Desktop/data/test_32x32.mat')
    X_train = rearrange(train_mat['X'])
    Y_train = train_mat['y'].flatten() - 1
    del train_mat
    Y_train_indi = y2indicator(Y_train)
    
    X_test = rearrange(test_mat['X'])
    Y_test = test_mat['y'].flatten() - 1
    del test_mat
    Y_test_indi = y2indicator(Y_test)
    
    max_iter = 20
    print_period = 10
    N = X_train.shape[0]
    batch_size = 500
    n_batch = N // batch_size
    
    X_train = X_train[:73000,]
    Y_train = Y_train[:73000]
    X_test = X_test[:26000,]
    Y_test = Y_test[:26000]
    Y_test_indi = Y_test_indi[:26000,]
    
    M = 500
    K = 10
    poolsize = (2,2)
    
    W1_shape = (5,5,3,20)
    W1_init = init_filter(W1_shape,poolsize)
    b1_init = np.zeros(20,dtype=np.float32)
    
    W2_shape = (5,5,20,50)
    W2_init = init_filter(W2_shape,poolsize)
    b2_init = np.zeros(50,dtype=np.float32)
    
    W3_init = np.random.rand(50*8*8,M)/np.sqrt(50*8*8+M)
    b3_init = np.zeros(M,dtype=np.float32)
    W4_init = np.random.rand(M,K)/np.sqrt(M+K)
    b4_init = np.zeros(K, dtype=np.float32)
    
    X = tf.placeholder(tf.float32,shape=(batch_size,32,32,3),name='X')
    T = tf.placeholder(tf.float32,shape=(batch_size,K),name='T')
    
    W1 = tf.Variable(W1_init.astype(np.float32))
    b1 = tf.Variable(b1_init.astype(np.float32))
    W2 = tf.Variable(W2_init.astype(np.float32))
    b2 = tf.Variable(b2_init.astype(np.float32))
    W3 = tf.Variable(W3_init.astype(np.float32))
    b3 = tf.Variable(b3_init.astype(np.float32))
    W4 = tf.Variable(W4_init.astype(np.float32))
    b4 = tf.Variable(b4_init.astype(np.float32))
    
    Z1 = my_convpool(X=X,W=W1,b=b1)
    Z2 = my_convpool(X=Z1,W=W2,b=b2)
    Z2_shape = Z2.get_shape().as_list()
    Z2r = tf.reshape(Z2,[Z2.shape[0],np.prod(Z2.shape[1:])])
    Z3 = tf.nn.relu(tf.matmul(Z2r,W3)+b3)
    Z4 = tf.matmul(Z3,W4)+b4
    
    cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(logits=Z4,labels=T))
    train_op = tf.train.RMSPropOptimizer(0.0001,decay=0.99,momentum=0.9).minimize(cost)

    predict_op = tf.argmax(Z4,axis=1)
    
    t0 = time.time()
    init = tf.global_variables_initializer()
    
    with tf.Session() as s:
        s.run(init)
        
        for i in range(max_iter):
            for j in range(batch_size):
                Xbatch = X_train[j*batch_size:(j+1)*batch_size,]
                Ybatch = Y_train_indi[j*batch_size:(j+1)*batch_size,]
                
                if len(Xbatch) == batch_size:
                    s.run(train_op,feed_dict={X:Xbatch,T:Ybatch})
                    if j % print_period == 0:
                        test_cost = 0
                        prediction = np.zeros(len(X_test))
                        for k in range(int(len(X_test)//batch_size)):
                            Xtestbatch = X_test[k*batch_size:(k+1)*batch_size,]
                            Ytestbatch = Y_test_indi[k*batch_size:(k+1)*batch_size,]
                            test_cost = s.run(cost,feed_dict={X:Xtestbatch,T:Ytestbatch})
                            prediction[k*batch_size:(k+1)*batch_size] = s.run(predict_op,feed_dict={X:Xtestbatch})
                        err = error_rate(Y_test,prediction)
                        print("the error rate in "+str(i)+"  "+str(j)+" is : "+str(err))
        t = time.time() - t0
        print("the time is "+str(t)+" seconds")
        
        
    
    
    
    

In [42]:
cnn_tf()

the error rate in 0  0 is : 0.8405769230769231
the error rate in 0  10 is : 0.9330384615384615
the error rate in 0  20 is : 0.9330384615384615


KeyboardInterrupt: 