In [1]:
import numpy as np
import pickle 
import sys
from time import *
from cnn_model.model_loss import *
from cnn_model.model_layers import *

class CNN_NET:
    def __init__(self):
        # Lenet
        # input: 100x
        # conv1: (5x5x6)@s1p2 -> 28x28x6 {(28-5+2x2)/1+1}
        # maxpool2: (2x2)@s2 -> 14x14x6 {(28-2)/2+1}
        # conv3: (5x5x16)@s1p0 -> 10x10x16 {(14-5)/1+1}
        # maxpool4: (2x2)@s2 -> 5x5x16 {(10-2)/2+1}
        # conv5: (5x5x120)@s1p0 -> 1x1x120 {(5-5)/1+1}
        # fc6: 120 -> 84
        # fc7: 84 -> 10
        # softmax: 10 -> 10
        lr = 0.01
        self.layers = []
        #0
        self.layers.append(Convolution2D(inputs_channel=3, num_filters=6, kernel_size=5, padding=2, stride=1, learning_rate=lr, name='conv1'))
        #1
        self.layers.append(ReLu())
        #2
        self.layers.append(Maxpooling2D(pool_size=2, stride=2, name='maxpool2'))
        #3
        self.layers.append(Convolution2D(inputs_channel=6, num_filters=16, kernel_size=5, padding=2, stride=1, learning_rate=lr, name='conv3'))
        #4
        self.layers.append(ReLu())
        #5
        self.layers.append(Maxpooling2D(pool_size=2, stride=2, name='maxpool4'))
        #6
        self.layers.append(Convolution2D(inputs_channel=16, num_filters=72, kernel_size=5, padding=2, stride=1, learning_rate=lr, name='conv5'))
        #7
        self.layers.append(ReLu())
        #8
        self.layers.append(Flatten())
        #9
        self.layers.append(FullyConnected(num_inputs=45000, num_outputs=36, learning_rate=lr, name='fc6'))
        #10
        self.layers.append(ReLu())
        #11
        self.layers.append(FullyConnected(num_inputs=36, num_outputs=6, learning_rate=lr, name='fc7'))
        #12
        self.layers.append(Softmax())
        self.lay_num = len(self.layers)

    def train(self, training_data, training_label, batch_size, epoch, weights_file):
        total_acc = 0
        for e in range(epoch):
            for batch_index in range(0, training_data.shape[0], batch_size):
                # batch input
                if batch_index + batch_size < training_data.shape[0]:
                    data = training_data[batch_index:batch_index+batch_size]
                    label = training_label[batch_index:batch_index + batch_size]
                else:
                    data = training_data[batch_index:training_data.shape[0]]
                    label = training_label[batch_index:training_label.shape[0]]
                loss = 0
                acc = 0
                start_time = time()
                for b in range(batch_size):
                    x = data[b]
                    y = label[b]
                    # forward pass
                    for l in range(self.lay_num):
                        # print("Working on forward pass for layer no.", l)
                        output = self.layers[l].forward(x)
                        # print("the shape output sfter iteration is :",l, output.shape)
                        x = output
#                     print("output shape:", output.shape)
                    loss += cross_entropy(output, y)
                    if np.argmax(output) == np.argmax(y):
                        acc += 1
                        total_acc += 1
                    # print("output is:", output, output.shape, np.argmax(output), np.argmax(y) )
                    # backward pass
                    # print("The Loss and accuracy is", loss, total_acc)
                    dy = output
                    for l in range(self.lay_num-1, -1, -1):
                        # print("Working on backward pass for layer no.", l)
                        dout = self.layers[l].backward(dy)
                        dy = dout
                # time
                end_time = time()
                batch_time = end_time-start_time
                remain_time = (training_data.shape[0]*epoch-batch_index-training_data.shape[0]*e)/batch_size*batch_time
                hrs = int(remain_time)/3600
                mins = int((remain_time/60-hrs*60))
                secs = int(remain_time-mins*60-hrs*3600)
                # result
                loss /= batch_size
                batch_acc = float(acc)/float(batch_size)
                training_acc = float(total_acc)/float((batch_index+batch_size)*(e+1))
                print('=== Epoch: {0:d}/{1:d} === Iter:{2:d} === Loss: {3:.2f} === BAcc: {4:.2f} === TAcc: {5:.2f} === Remain: {6:d} Hrs {7:d} Mins {8:d} Secs ==='.format(e,epoch,batch_index+batch_size,loss,batch_acc,training_acc,int(hrs),int(mins),int(secs)))
        # dump weights and bias
            obj = []
            for i in range(self.lay_num):
                cache = self.layers[i].extract()
                obj.append(cache)
            with open(weights_file, 'ab') as handle:
                pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)

    def test(self, data, label, test_size):
        # toolbar_width = 40
        # sys.stdout.write("[%s]" % (" " * (toolbar_width-1)))
        # sys.stdout.flush()
        # sys.stdout.write("\b" * (toolbar_width))
        # step = float(test_size)/float(toolbar_width)
        # st = 1
        total_acc = 0
        for i in range(test_size):
            x = data[i]
            y = label[i]
            # if i == round(step):
            #     step += float(test_size)/float(toolbar_width)
            #     st += 1
            #     sys.stdout.write(".")
                #sys.stdout.write("%s]a"%(" "*(toolbar_width-st)))
                #sys.stdout.write("\b" * (toolbar_width-st+2))
                # sys.stdout.flush()
            
            for l in range(self.lay_num):
                output = self.layers[l].forward(x)
                x = output
            if np.argmax(output) == np.argmax(y):
                total_acc += 1
        sys.stdout.write("\n")
        print('=== Test Size:{0:d} === Test Acc:{1:.2f} ==='.format(test_size, float(total_acc)/float(test_size)))

    def test_with_pretrained_weights(self, data, label, test_size, weights_file):
        with open(weights_file, 'rb') as handle:
            b = pickle.load(handle)
        self.layers[0].feed(b[0]['conv1.weights'], b[0]['conv1.bias'])
        self.layers[3].feed(b[3]['conv3.weights'], b[3]['conv3.bias'])
        self.layers[6].feed(b[6]['conv5.weights'], b[6]['conv5.bias'])
        self.layers[9].feed(b[9]['fc6.weights'], b[9]['fc6.bias'])
        self.layers[11].feed(b[11]['fc7.weights'], b[11]['fc7.bias'])
        toolbar_width = 40
        sys.stdout.write("[%s]" % (" " * (toolbar_width-1)))
        sys.stdout.flush()
        sys.stdout.write("\b" * (toolbar_width))
        step = float(test_size)/float(toolbar_width)
        st = 1
        total_acc = 0
        for i in range(test_size):
            if i == round(step):
                step += float(test_size)/float(toolbar_width)
                st += 1
                sys.stdout.write(".")
                #sys.stdout.write("%s]a"%(" "*(toolbar_width-st)))
                #sys.stdout.write("\b" * (toolbar_width-st+2))
                sys.stdout.flush()
            x = data[i]
            y = label[i]
            for l in range(self.lay_num):
                output = self.layers[l].forward(x)
                x = output
            if np.argmax(output) == np.argmax(y):
                total_acc += 1
        sys.stdout.write("\n")
        print('=== Test Size:{0:d} === Test Acc:{1:.2f} ==='.format(test_size, float(total_acc)/float(test_size)))
            
    def predict_with_pretrained_weights(self, inputs, weights_file):
        with open(weights_file, 'rb') as handle:
            b = pickle.load(handle)
        self.layers[0].feed(b[0]['conv1.weights'], b[0]['conv1.bias'])
        self.layers[3].feed(b[3]['conv3.weights'], b[3]['conv3.bias'])
        self.layers[6].feed(b[6]['conv5.weights'], b[6]['conv5.bias'])
        self.layers[9].feed(b[9]['fc6.weights'], b[9]['fc6.bias'])
        self.layers[11].feed(b[11]['fc7.weights'], b[11]['fc7.bias'])
        for l in range(self.lay_num):
            output = self.layers[l].forward(inputs)
            inputs = output
        digit = np.argmax(output)
        probability = output[0, digit]
        return digit, probability



In [2]:
import glob 
import os
import matplotlib.image as mpimg
import matplotlib.pyplot as mp
import glob
from cnn_model.model_network import CNN_NET
from skimage import io
import numpy as np
from skimage.transform import rescale, resize
print('Loadind and Preparing Testing Image data......')

# Loading the Testing Images data
testing_img_dir = r"test_imgs"
test_img = []
for img in glob.glob( testing_img_dir + '/*' + '/*.jpg'):
#     cv_img.append(resize(io.imread(img, as_grey = True)/255, (100, 100)))
    test_img.append(resize(mpimg.imread(img)/255, (100, 100, 3)))
    test_imgs = np.array(test_img)
test_imgs -= int(np.mean(test_imgs))


print('Preparing Testing Image data Labels......')

# Creating the Testing Images Class Labels with One hot Coding
Class_num = 6
class_lab = []
classes = glob.glob( testing_img_dir + '/*')
count = 0
for clas in classes:
    c_len = len(glob.glob( clas + '/*.jpg'))
    class_lab.extend([count] * c_len)
    count += 1
class_labels = np.array(class_lab) 
testing_labels = np.eye(Class_num)[class_labels]

Loadind and Preparing Testing Image data......
Preparing Testing Image data Labels......


# Class name : Class Label
building   :   0 
forest     :   1
glacier    :   2
mountain   :   3
sea        :   4
street     :   5

In [None]:
test_size = 30
CNN = CNN_NET()
print('Testing CNN......')
CNN.testing_trained_weights(test_imgs, testing_labels, test_size, 'cnn_model_weights.pkl')

Testing CNN......


In [4]:
test_size = 30
CNN = CNN_NET()
CNN.predict_img_trained_weights(test_imgs[15], 'cnn_model_weights.pkl')

(5, 0.16666799259951431)