# ML HW1 - AutoEncoder

Mount google drive to colab.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Import neccessary libraties and set parameters.

In [None]:
import os
import struct
import numpy as np
import matplotlib.pyplot as plt

mnist_path = "http://yann.lecun.com/exdb/mnist/"

zip_list = ["train-images-idx3-ubyte.gz", 
            "train-labels-idx1-ubyte.gz", 
            "t10k-images-idx3-ubyte.gz", 
            "t10k-labels-idx1-ubyte.gz" ]

ext_list = ["train-images.idx3-ubyte", 
            "train-labels.idx1-ubyte", 
            "t10k-images.idx3-ubyte", 
            "t10k-labels.idx1-ubyte" ]

def downloadMNIST(path = ".", unzip=True):
    import urllib.request
    if not os.path.exists(path):
        os.makedirs(path)

    for i in range(len(zip_list)):
        zip_name = zip_list[i]
        fname1 = os.path.join(path, zip_name)
        print("Download ", zip_name, "...")
        if not os.path.exists(fname1):
            urllib.request.urlretrieve( mnist_path + zip_name, fname1)
        else:
            print("pass")

        if unzip:
            import gzip
            import shutil
            ext_name = ext_list[i]
            fname2 = os.path.join(path, ext_name)
            print("Extract", fname1, "...")
            if not os.path.exists(fname2):
                with gzip.open(fname1, 'rb') as f_in:
                    with open(fname2, 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
            else:
                print("pass")
            

def loadMNIST(dataset = "training", path = "."):
    if dataset == "training":
        fname_img = os.path.join(path, 'train-images.idx3-ubyte')
        fname_lbl = os.path.join(path, 'train-labels.idx1-ubyte')
    elif dataset == "testing":
        fname_img = os.path.join(path, 't10k-images.idx3-ubyte')
        fname_lbl = os.path.join(path, 't10k-labels.idx1-ubyte')
    else:
        raise ValueError("dataset must be 'testing' or 'training'")

    flbl = open(fname_lbl, 'rb')
    magic_nr, size = struct.unpack(">II", flbl.read(8))
    lbl = np.fromfile(flbl, dtype=np.int8)
    flbl.close()

    fimg = open(fname_img, 'rb')
    magic_nr, size, rows, cols = struct.unpack(">IIII", fimg.read(16))
    img = np.fromfile(fimg, dtype=np.uint8).reshape(len(lbl), rows*cols)
    fimg.close()

    return img, lbl

In [None]:
class AE(): 
    def __init__(self, input_size, hidden_size, output_size, activation):   
        self.W1 = np.random.randn(input_size, hidden_size) / np.sqrt(input_size)
        self.b1 = np.zeros([1, hidden_size])
        self.W2 = np.random.randn(hidden_size, output_size) / np.sqrt(input_size)
        self.b2 = np.zeros([1, output_size])
        # Activation Function & Placeholders
        self.activation = activation # “linear”/“softmax”/”sigmoid”
        self.placeholder = {"x":None, "y":None}

    
    # Feed Placeholder
    def feed(self, feed_dict):
        for key in feed_dict:
            self.placeholder[key] = feed_dict[key].copy()
    
    # Forward Propagation
    def relu(self, x):
        return np.maximum(x, 0)

    def forward(self):

        n = self.placeholder["x"].shape[0]
        self.x = self.placeholder["x"]
        self.B1 = np.ones((n, 1)).dot(self.b1)
        self.xw1 = self.x.dot(self.W1)
        self.a1 = self.xw1 + self.B1
        self.h1 = np.maximum(self.a1,0)
        self.B2 = np.ones((n, 1)).dot(self.b2)
        self.xw2 = self.h1.dot(self.W2)
        self.a2 = self.xw2 + self.B2

        if self.activation == "linear":
            self.y = self.a2.copy()
        # Softmax Activation
        elif self.activation == "softmax":
            self.y_logit = np.exp(self.a2 - np.max(self.a2, 1, keepdims=True))
            self.y = self.y_logit / np.sum(self.y_logit, 1, keepdims=True)
        # Sigmoid Activation
        elif self.activation == "sigmoid":
            self.y = 1.0 / (1.0 + np.exp(-self.a2))
        
        return self.y
    
    # Backward Propagation
    def backward(self):
        n = self.placeholder["y"].shape[0]
        self.grad_a2 = (self.y - self.placeholder["y"]) / n
        self.grad_b2 = np.ones((n, 1)).T.dot(self.grad_a2)
        self.grad_W2 = self.h1.T.dot(self.grad_a2)
        self.grad_h1 = self.grad_a2.dot(self.W2.T)
        self.grad_a1 = self.grad_h1.copy()
        self.grad_a1[self.a1<0] = 0
        self.grad_b1 = np.ones((n, 1)).T.dot(self.grad_a1)
        self.grad_W1 = self.x.T.dot(self.grad_a1)
    
    # Update Weights
    def update(self, learning_rate):
        self.W1 = self.W1 - learning_rate * self.grad_W1
        self.b1 = self.b1 - learning_rate * self.grad_b1
        self.W2 = self.W2 - learning_rate * self.grad_W2
        self.b2 = self.b2 - learning_rate * self.grad_b2
        return self.W1
    
    # Loss Functions
    def computeLoss(self):
        loss = 0.0
        # Mean Square Error
        if self.activation == "linear":
            loss = 0.5 * np.square(self.y - self.placeholder["y"]).mean()
        # Softmax Cross Entropy
        elif self.activation == "softmax":
            loss = -self.placeholder["y"] * np.log(self.y + 1e-6)
            loss = np.sum(loss, 1).mean()
        # Sigmoid Cross Entropy
        elif self.activation == "sigmoid":
            loss = -self.placeholder["y"] * np.log(self.y + 1e-6) - \
            (1-self.placeholder["y"]) * np.log(1-self.y + 1e-6)
            loss = np.mean(loss)
            
            
        return loss

In [None]:

if __name__ == "__main__":
    # Dataset
    downloadMNIST(path='MNIST_data', unzip=True)
    x_train, y_train = loadMNIST(dataset="training", path="MNIST_data")
    x_test, y_test = loadMNIST(dataset="testing", path="MNIST_data")

    # Show Data and Label
    plt.imshow(x_train[0].reshape((28,28)), cmap='gray')
    plt.show()

    # todo (Data Processing)
    x_train = x_train.astype(np.float32) / 255.
    x_test = x_test.astype(np.float32) / 255.

    # todo (Create AE MODEL)
    ae = AE(784, 128, 784, "sigmoid")

    # Training the Model
    loss_rec = []
    batch_size = 64
    data_size = 60000

    for i in range(10001):
        
        # todo (Sample Data Batch)
        batch_id = np.random.choice(data_size, batch_size)
        x_batch = x_train[batch_id]

        # todo (Forward & Backward & Update)
        ae.feed({"x":x_batch, "y":x_batch})
        ae.forward()
        ae.backward()
        filter = ae.update(0.05)

        # todo (Loss)
        loss = ae.computeLoss()
        loss_rec.append(loss)

        # todo (Evaluation)
        batch_id = np.random.choice(x_test.shape[0], batch_size)
        x_test_batch = x_test[batch_id]

        ae.feed({"x":x_test_batch, "y":x_test_batch})
        y_test_out = ae.forward()
        
        if i%100 == 0:
            print("\r[Iteration {:5d}] Loss={:.4f}".format(i, loss))

    batch_size = 10
    batch_id = np.random.choice(x_test.shape[0], batch_size)
    x_test = x_test[batch_id]
    ae.feed({"x":x_test, "y":x_test})
    y_prob = ae.forward()

    print("\n","Origin image")
    for i in range(len(x_test)):
        plt.subplot(1, 10, i + 1)
        plt.imshow(x_test[i].reshape((28,28)), cmap='gray')
    plt.show()

    print("Reconstruction results")
    for i in range(len(x_test)):
        plt.subplot(1, 10, i + 1)
        plt.imshow(y_prob[i].reshape((28,28)), cmap='gray')
    plt.show()
    print("16 filters")
    for i in range(16):
        plt.subplot(4, 4, i + 1)
        plt.imshow(filter.T[i].reshape((28,28)), cmap='gray')
    plt.show()

    plt.plot(loss_rec)
    plt.show()
