In [1]:
from tensorflow import keras
from utils import convolve2D


import numpy as np
import os
from ipywidgets import IntProgress
from scipy.signal import convolve
import matplotlib.pyplot as plt

In [2]:
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()

In [3]:
cond=np.any([y_train_full==1,y_train_full==3],0)
X_train_full=X_train_full[cond]
y_train_full=y_train_full[cond]

X_test=X_test[np.any([y_test==1,y_test==3],0)]
y_test=y_test[np.any([y_test==1,y_test==3],0)]

y_train_full=(y_train_full==3).astype(int)
y_test=(y_test==3).astype(int)

In [4]:
X_train, X_valid = X_train_full[:-1000], X_train_full[-1000:]
y_train, y_valid = y_train_full[:-1000], y_train_full[-1000:]
print(f"X train shape: {X_train.shape}, {X_valid.shape}")
print(f"y train shape: {y_train.shape}, {y_valid.shape}")

X train shape: (11000, 28, 28), (1000, 28, 28)
y train shape: (11000,), (1000,)


In [5]:
X_mean = X_train.mean(axis=0, keepdims=True)
X_std = X_train.std(axis=0, keepdims=True) + 1e-7
X_train = (X_train - X_mean) / X_std
X_valid = (X_valid - X_mean) / X_std
X_test = (X_test - X_mean) / X_std

In [6]:
from numba import jit

@jit
def convolve2D(image, kernel, padding=0, strides=1, flip=True):
    # Cross Correlation. Reversing matrix.
    if flip:
        kernel = np.flipud(np.fliplr(kernel))

    # Gather Shapes of Kernel + Image + Padding
    xKernShape = kernel.shape[0]
    yKernShape = kernel.shape[1]
    xImgShape = image.shape[0]
    yImgShape = image.shape[1]

    # Shape of Output Convolution
    xOutput = int(((xImgShape - xKernShape + 2 * padding) / strides) + 1)
    yOutput = int(((yImgShape - yKernShape + 2 * padding) / strides) + 1)
    output = np.zeros((xOutput, yOutput))

    # Apply Equal Padding to All Sides
    if padding != 0:
        imagePadded = np.zeros((image.shape[0] + padding*2, image.shape[1] + padding*2))
        imagePadded[int(padding):int(-1 * padding), int(padding):int(-1 * padding)] = image
    else:
        imagePadded = image

    # Iterate through image
    for y in range(imagePadded.shape[1]):
        # Exit Convolution
        if y > imagePadded.shape[1] - yKernShape:
            break
        # Only Convolve if y has gone down by the specified Strides
        if y % strides == 0:
            for x in range(imagePadded.shape[0]):
                # Go to next row once kernel is out of bounds
                if x > imagePadded.shape[0] - xKernShape:
                    break

                # Only Convolve if x has moved by the specified Strides
                if x % strides == 0:
                    output[x, y] = (kernel * imagePadded[x: x + xKernShape, y: y + yKernShape]).sum()


    return output

In [7]:
class CNN():
    def __init__(self):
        pass
    
    def cost_function():
        pass
    
    def forward_propagation(self, W1, W2, X, y):
        l0=X
        
        # Embed the image in a bigger image. It would be useful in computing corrections to the convolution
        # filter
        lt0=np.zeros((l0.shape[0]+K-1,l0.shape[1]+K-1))
        lt0[K//2:-K//2+1,K//2:-K//2+1]=l0
        
        # convolve with the filter
        l0_conv=convolve(l0,W1[::-1,::-1],'same','direct')
        
        # Layer one is Relu applied on the convolution 
        l1=relu(l0_conv)
        
        # Also compute derivative of layer 1
        f1p=relu_prime(l0_conv)

        # Compute layer 2
        l2=sigmoid(np.dot(l1.reshape(-1,),W2))
        l2=l2.clip(10**-16,1-10**-16)
        
        # Loss and Accuracy
        loss=-(y*np.log(l2)+(1-y)*np.log(1-l2))
        accuracy=int(y==np.where(l2>0.5,1,0))
        
        return accuracy, loss, lt0
        
    def backward_propagation(self, X, Y, cache):
        pass
    
    def fit(self, X_vert, Y_vert):
        K=3
        image_size=X_train.shape[1]
        image_size_embedding_size=image_size+K-1

        print("image_size: ", image_size)
        print("image_size_embedding_size (after convolution): ", image_size_embedding_size)

        np.random.seed(42)
        W1=np.random.normal(0,2/np.sqrt(K*K),size=(K,K))
        W2=np.random.normal(0,1/np.sqrt(image_size*image_size),size=(image_size*image_size))

        W1_original=W1.copy()
        W2_original=W2.copy()

        print(W1.shape)
        print(W2.shape)
        
        pass
    
    def predict(self, X_vert):
        pass

    def relu(self, x):
        return np.where(x>0,x,0)

    def relu_prime(self, x):
        return np.where(x>0,1,0)

    def sigmoid(self, x):
        return 1./(1.+np.exp(-x))

In [8]:
from functools import wraps
import numpy as np
from scipy.special import erf


def relu(x):
    return np.where(x>0,x,0)
    
def relu_prime(x):
    return np.where(x>0,1,0)

def sigmoid(x):
    return 1./(1.+np.exp(-x))

def forward_pass(W1,W2,X,y):
    l0=X
    l0_conv=convolve(l0,W1[::-1,::-1],'same','direct')    

    l1=relu(l0_conv)    
    l2=sigmoid(np.dot(l1.reshape(-1,),W2))
    l2=l2.clip(10**-16,1-10**-16)    
    loss=-(y*np.log(l2)+(1-y)*np.log(1-l2))
    accuracy=int(y==np.where(l2>0.5,1,0))    
    return accuracy,loss

def coroutine(func):
    @wraps(func)
    def inner(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen

    return inner

@coroutine
def averager():
    total = 0
    count = 0
    average = None
    cont = True
    while cont:
        val = yield average
        if val is None:
            cont = False
            continue
        else:
            total += val
            count += 1.
            average = total / count
    return average


def extract_averager_value(averager):
    try:
        averager.send(None)
    except StopIteration as e:
        return e.value

In [9]:
# filter size
K=3
image_size=X_train.shape[1]
image_size_embedding_size=image_size+K-1

print("image_size: ", image_size)
print("image_size_embedding_size (after convolution): ", image_size_embedding_size)

image_size:  28
image_size_embedding_size (after convolution):  30


In [10]:
np.random.seed(42)
W1=np.random.normal(0,2/np.sqrt(K*K),size=(K,K))
W2=np.random.normal(0,1/np.sqrt(image_size*image_size),size=(image_size*image_size))

W1_original = W1.copy()
W2_original = W2.copy()

print(W1.shape)
print(W2.shape)

(3, 3)
(784,)


In [29]:
def forward_propagation(X, y, W1, W2):
    # First layer is just the input
    l0=X

    # Embed the image in a bigger image. It would be useful in computing corrections to the convolution
    # filter
    lt0=np.zeros((l0.shape[0]+K-1,l0.shape[1]+K-1))
    lt0[K//2:-K//2+1,K//2:-K//2+1]=l0

    #### FORWARD
    # convolve with the filter
    #l0_conv=convconvolveolve2D(l0,W1[::-1,::-1],'same','direct')
    l0_conv=convolve2D(l0, W1, padding=1, strides=1, flip=False)

    # Layer one is Relu applied on the convolution 
    l1=relu(l0_conv)

    # Also compute derivative of layer 1
    f1p=relu_prime(l0_conv)

    # Compute layer 2
    l2=sigmoid(np.dot(l1.reshape(-1,),W2))
    l2=l2.clip(10**-16,1-10**-16)

    
    loss = -(y*np.log(l2)+(1-y)*np.log(1-l2))
    accuracy = int(y==np.where(l2>0.5,1,0))
    
    return accuracy, loss, l1, l2, lt0, f1p

def back_propagation(y, l1, l2, lt0, f1p):
    # Derivative of loss wrt the dense layer
    dW2=(((1-y)*l2-y*(1-l2))*l1).reshape(-1,)

    # Derivative of loss wrt the output of the first layer
    dl1=(((1-y)*l2-y*(1-l2))*W2).reshape(28,28)

    # Derivative of the loss wrt the convolution filter
    dl1_f1p=dl1*f1p
    dW1=np.array([
        [(lt0[alpha:+alpha+image_size,beta:beta+image_size]*dl1_f1p).sum() for beta in range(K)] \
                      for alpha in range(K)
    ])

    return dW1, dW2

In [30]:
from ipywidgets import IntProgress
w=IntProgress(max=len(y_train))
display(w)
eta=.001

for epoch in range(2):
    train_loss=averager()
    train_accuracy=averager()
    
    for i in range(len(y_train)):
        
        # Take a random sample
        k=np.random.randint(len(y_train))
        X=X_train[k]
        y=y_train[k]
        if (i+1) % 100 ==0:
            w.value=i+1
        
        accuracy, loss, l1, l2, lt0, f1p = forward_propagation(X, y, W1, W2)
        
        dW1, dW2 = back_propagation(y, l1, l2, lt0, f1p)

        # Save the loss and accuracy to a running averager
        train_loss.send(loss)
        train_accuracy.send(accuracy)

        W2+=-eta*dW2
        W1+=-eta*dW1

    
    loss_averager_valid=averager()
    accuracy_averager_valid=averager()   
    
    for X,y in zip(X_valid,y_valid):
        accuracy,loss=forward_pass(W1,W2,X,y)
        loss_averager_valid.send(loss)
        accuracy_averager_valid.send(accuracy)
    
    train_loss, train_accuracy, valid_loss, valid_accuracy = map(extract_averager_value,[
                                                                train_loss,
                                                                train_accuracy,
                                                                loss_averager_valid,
                                                                accuracy_averager_valid]
                                                               )
    msg='Epoch {}: train loss {:.2f}, train acc {:.2f}, valid loss {:.2f}, valid acc {:.2f}'.format(epoch+1,
                                                                                                      train_loss,
                                                                                                      train_accuracy,
                                                                                                      valid_loss,
                                                                                                      valid_accuracy
                                                                                                     )
    print(msg)

    
    

IntProgress(value=0, max=11000)

Epoch 1: train loss 0.06, train acc 0.98, valid loss 0.09, valid acc 0.97
Epoch 2: train loss 0.05, train acc 0.98, valid loss 0.09, valid acc 0.97


In [28]:
# def forward_pass_for_plot(W1,W2,X,y):
#     l0=X
#     l0_conv=convolve(l0,W1[::-1,::-1],'same','direct')

#     l1=relu(l0_conv)

#     l2=sigmoid(np.dot(l1.reshape(-1,),W2))
#     l2=l2.clip(10**-16,1-10**-16)


#     loss=-(y*np.log(l2)+(1-y)*np.log(1-l2))
#     accuracy=int(y==np.where(l2>0.5,1,0))

#     return l1,l2

In [25]:
# def plot_figs(i):
#     _,axs=plt.subplots(1,3,figsize=(10,3))
#     axs[0].imshow(X_train[i],cmap='gray')
#     l1_original,l2_original=forward_pass_for_plot(W1_original,W2_original,X_train[i],y_train[i])
#     axs[1].imshow(l1_original,cmap='gray')
#     l1,l2=forward_pass_for_plot(W1,W2,X_train[i],y_train[i])
#     axs[2].imshow(l1,cmap='gray')


In [15]:
# plt.imshow(W1,cmap='gray')

In [16]:
# plt.imshow(W2.reshape(28,28),cmap='gray')

In [17]:
# for i in range(10):
#     plot_figs(i)