# autoencoder

In [4]:
import tensorflow as tf
import numpy as np
import sklearn.preprocessing as prep
import matplotlib.pyplot as plt
%matplotlib inline

## Setting

In [3]:
# Parameters
transfer_function = tf.nn.sigmoid # tf.nn.relu, tf.nn.softplus, tf.nn.sigmoid, tf.nn.tanh
dropout_probability = 0.9
training_epochs = 100
batch_size = 128
display_step = 1

## data set

In [32]:
# Check out https://www.tensorflow.org/get_started/mnist/beginners for
# more information about the mnist dataset

from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)
X_train, X_test = mnist.train.images, mnist.test.images
n_samples = int(mnist.train.num_examples)

Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz


## Help functions

In [33]:
def standard_scale(X_train, X_test):
    preprocessor = prep.StandardScaler().fit(X_train)
    X_train = preprocessor.transform(X_train)
    X_test = preprocessor.transform(X_test)
    
    return X_train, X_test

In [34]:
def inverse_transform(X_train_original, X_processed):
    preprocessor = prep.StandardScaler().fit(X_train_original)
    X = preprocessor.inverse_transform(X_processed)
    
    return X

In [35]:
def get_random_block_from_data(data, batch_size):
    start_index = np.random.randint(0, len(data) - batch_size)
    
    return data[start_index:(start_index + batch_size)]

## autoencoder model

In [37]:
class AdditiveGaussianNoiseAutoencoder(object):
    def __init__(self, n_input, n_hidden, transfer_function=tf.nn.softplus, optimizer=tf.train.AdamOptimizer(),
                 scale=0.1):
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.transfer = transfer_function
        self.scale = tf.placeholder(tf.float32)
        self.training_scale = scale

        network_weights = self._initialize_weights()
        self.weights = network_weights

        # model
        self.x = tf.placeholder(tf.float32, [None, self.n_input])
        self.corrupted = self.x + scale * tf.random_normal((n_input,))
        self.hidden = self.transfer(tf.add(tf.matmul(self.corrupted, self.weights['w1']), self.weights['b1']))
        self.reconstruction = tf.add(tf.matmul(self.hidden, self.weights['w2']), self.weights['b2'])

        # cost
        self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(self.reconstruction, self.x), 2.0))
        self.optimizer = optimizer.minimize(self.cost)

        init = tf.global_variables_initializer()
        self.sess = tf.Session()
        self.sess.run(init)

    def _initialize_weights(self):
        all_weights = dict()
        all_weights['w1'] = tf.get_variable("w1", shape=[self.n_input, self.n_hidden],
            initializer=tf.contrib.layers.xavier_initializer())
        all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden], dtype = tf.float32))
        all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden, self.n_input], dtype = tf.float32))
        all_weights['b2'] = tf.Variable(tf.zeros([self.n_input], dtype = tf.float32))
        return all_weights

    def partial_fit(self, X):
        cost, opt = self.sess.run((self.cost, self.optimizer), feed_dict = {self.x: X,
                                                                            self.scale: self.training_scale
                                                                            })
        return cost

    def calc_total_cost(self, X):
        return self.sess.run(self.cost, feed_dict = {self.x: X,
                                                     self.scale: self.training_scale
                                                     })

    def transform(self, X):
        return self.sess.run(self.hidden, feed_dict = {self.x: X,
                                                       self.scale: self.training_scale
                                                       })

    def generate(self, hidden=None):
        if hidden is None:
            hidden = self.sess.run(tf.random_normal([1, self.n_hidden]))
        return self.sess.run(self.reconstruction, feed_dict = {self.hidden: hidden})

    def reconstruct(self, X):
        return self.sess.run(self.reconstruction, feed_dict = {self.x: X,
                                                               self.scale: self.training_scale
                                                               })
    def corrupt(self, X):
        return self.sess.run(self.corrupted, fedd_dict={self.x:X, self.scale: self.training_scale})
    
    def getWeights(self):
        return self.sess.run(self.weights['w1'])

    def getBiases(self):
        return self.sess.run(self.weights['b1'])


class MaskingNoiseAutoencoder(object):
    def __init__(self, n_input, n_hidden, transfer_function = tf.nn.softplus, optimizer = tf.train.AdamOptimizer(),
                 dropout_probability = 0.95, tied_weights=False):
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.transfer = transfer_function
        self.dropout_probability = dropout_probability
        self.keep_prob = tf.placeholder(tf.float32)
        self.tied_weights = tied_weights

        network_weights = self._initialize_weights(tied=self.tied_weights)
        self.weights = network_weights

        # model
        self.x = tf.placeholder(tf.float32, [None, self.n_input])
        self.corrupted = tf.nn.dropout(self.x, self.keep_prob)
        self.hidden = self.transfer(tf.add(tf.matmul(self.corrupted, self.weights['w1']), self.weights['b1']))
        self.reconstruction = self.transfer(tf.add(tf.matmul(self.hidden, self.weights['w2']), self.weights['b2']))

        # cost
        self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(self.reconstruction, self.x), 2.0))
        self.optimizer = optimizer.minimize(self.cost)

        init = tf.global_variables_initializer()
        self.sess = tf.Session()
        self.sess.run(init)

    # def _initialize_weights(self, tied):
    #     all_weights = dict()
    #     all_weights['w1'] = tf.get_variable("w1", shape=[self.n_input, self.n_hidden],
    #         initializer=tf.contrib.layers.xavier_initializer())
    #     all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden], dtype = tf.float32))
    #     if tied == True:
    #         all_weights['w2'] = tf.transpose(all_weights['w1'])
    #     else:
    #         all_weights['w2'] = = tf.Variable(tf.zeros([self.n_hidden, self.n_input], dtype = tf.float32))
    #     all_weights['b2'] = tf.Variable(tf.zeros([self.n_input], dtype = tf.float32))
    #     return all_weights

    def _initialize_weights(self, tied):
        all_weights = dict()
        all_weights['w1'] = tf.get_variable('w1', shape=[self.n_input, self.n_hidden], initializer=tf.contrib.layers.xavier_initializer())
        all_weights['b1'] = tf.get_variable('b1', shape=[self.n_hidden], initializer=tf.constant_initializer(0.0))
        if tied == True:
            all_weights['w2'] = tf.transpose(all_weights['w1'], name='w2')
        else:
            all_weights['w2'] = tf.get_variable('w2', shape=[self.n_hidden, self.n_input], initializer=tf.contrib.layers.xavier_initializer())
        all_weights['b2'] = tf.get_variable('b2', shape=[self.n_input], initializer=tf.constant_initializer(0.0))
        return all_weights

    def partial_fit(self, X):
        cost, opt = self.sess.run((self.cost, self.optimizer),
                                  feed_dict = {self.x: X, self.keep_prob: self.dropout_probability})
        return cost

    def calc_total_cost(self, X):
        return self.sess.run(self.cost, feed_dict = {self.x: X, self.keep_prob: 1.0})

    def transform(self, X):
        return self.sess.run(self.hidden, feed_dict = {self.x: X, self.keep_prob: 1.0})

    def generate(self, hidden=None):
        if hidden is None:
            hidden = self.sess.run(tf.random_normal([1, self.n_hidden]))
        return self.sess.run(self.reconstruction, feed_dict = {self.hidden: hidden})

    def reconstruct(self, X):
        return self.sess.run(self.reconstruction, feed_dict = {self.x: X, self.keep_prob: 1.0})

    def corrupt(self, X):
        return self.sess.run(self.corrupted, feed_dict={self.x: X, self.keep_prob: self.dropout_probability})

    def getWeights(self):
        return self.sess.run(self.weights['w1'])

    def getBiases(self):
        return self.sess.run(self.weights['b1'])

In [38]:
autoencoder = MaskingNoiseAutoencoder(n_input=784,
                                      n_hidden=200,
                                      transfer_function=transfer_function,
                                      optimizer=tf.train.AdamOptimizer(learning_rate = 0.001),
                                      dropout_probability=dropout_probability,
                                      tied_weights=True)

## model graphs
### tf.nn.conv2d(input, filter, strides, padding)
- input tensor shape: [batch, in_height, in_width, in_channels]
- filter tensor shape: [filter_height, filter_width, in_channels, out_channels]
- strid tensor: input의 각 dimension에 대한 sliding window의 stride
- padding: "SAME" or "VALID"

## tf.nn.max_pool(value, k_size, strides, padding)
- value: A 4-D Tensor with shape [batch, height, width, channels] and type tf.float32
- k_size: A list of integers that has length >=4. The size of the window for each dimension of the input tensor
- strides: A list of integers that has lenght >=4. The stride of the sliding window for dimension of the input tensor.
- padding: "SAME", "VALID"

## tf.nn.relu(features)
- max(features, 0)

## tf.nn.dropout(x, keep_prob)
- keep_prob: dropout probability

In [12]:
def model(X, w, w2, w3, w4, w_o, p_keep_conv, p_keep_hidden):
    l1a = tf.nn.relu(tf.nn.conv2d(X, w,
                                  strides=[1, 1, 1, 1], padding='SAME'))
       # l1a output shape=(?, input_height, input_width, number_of_channels_layer1)
    l1 = tf.nn.max_pool(l1a, ksize=[1, 2, 2, 1],              
                        strides=[1, 2, 2, 1], padding='SAME')
    # l1 output shape=(?, input_height/2, input_width/2, number_of_channels_layer1)
    l1 = tf.nn.dropout(l1, p_keep_conv)
                                  
    l2a = tf.nn.relu(tf.nn.conv2d(l1, w2,                     
                        strides=[1, 1, 1, 1], padding='SAME'))
    # l2a output shape=(?, input_height/2, input_width/2, number_of_channels_layer2)
    l2 = tf.nn.max_pool(l2a, ksize=[1, 2, 2, 1],              
                        strides=[1, 2, 2, 1], padding='SAME')
    # l2 shape=(?, input_height/4, input_width/4, number_of_channels_layer2)
    l2 = tf.nn.dropout(l2, p_keep_conv)

    l3a = tf.nn.relu(tf.nn.conv2d(l2, w3,                     
                        strides=[1, 1, 1, 1], padding='SAME'))
    # l3a shape=(?, input_height/4, input_width/4, number_of_channels_layer3)
    l3 = tf.nn.max_pool(l3a, ksize=[1, 2, 2, 1],              
                        strides=[1, 2, 2, 1], padding='SAME')
    # l3 shape=(?, input_height/8, input_width/8, number_of_channels_layer3)
    l3 = tf.reshape(l3, [-1, w4.get_shape().as_list()[0]])
    # flatten to (?, input_height/8 * input_width/8 * number_of_channels_layer3)
    l3 = tf.nn.dropout(l3, p_keep_conv)

    l4 = tf.nn.relu(tf.matmul(l3, w4))
    #fully connected_layer
    l4 = tf.nn.dropout(l4, p_keep_hidden)
    
    pyx = tf.matmul(l4, w_o)
    return pyx

## weight variable & Model
### convlution + max pooling layer
* 1st convolution + max pooling layer: 3 X 3  Convolution, 2 * 2 max pooling, channel_in: 1, channel_out: 32
* 2nd convolution + max pooling layer: 3 X 3  Convolution, 2 * 2 max pooling, channel_in: 32, channel_out: 64
* 3th convolution + max pooling layer: 3 X 3  Convolution, 2 * 2 max pooling, channel_in: 64, channel_out: 128

### fully connected layer
* flatten input layer to the fully connected layer
* hidden layer unit: 625
* output layer: number of classes (10)

### dropout
* convolution layer dropout
* fully connected layer dropout

### X, Y

In [13]:
X = tf.placeholder(tf.float32, [None, 28, 28, 1])
Y = tf.placeholder(tf.int32, [None])

### Weight 

In [14]:
w = init_weights([3, 3, 1, 32])
w2 = init_weights([3, 3, 32, 64])
w3 = init_weights([3, 3, 64, 128])
w4 = init_weights([128 * 4 * 4, 625])
w_o = init_weights([625, 10])

In [20]:
p_keep_conv = tf.placeholder(tf.float32)
p_keep_hidden = tf.placeholder(tf.float32)
py_x = model(X, w, w2, w3, w4, w_o, p_keep_conv, p_keep_hidden)

### loss function

In [21]:
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=py_x, labels=Y))

### optimizer

In [22]:
train_op = tf.train.AdagradOptimizer(learning_rate=0.1).minimize(loss)

### Accuracy

In [23]:
correct = tf.nn.in_top_k(py_x, Y, 1)
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

## Run

In [25]:
trn_loss_list = []
test_loss_list = []
test_accuracy_list = []
batch_size = 128

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    
    for i in range(50):
        trn_loss = 0
        
        for start, end in zip(range(0, len(X_trn), batch_size), range(batch_size, len(X_trn)+1, batch_size)):
            tmp_loss, _ = sess.run([loss, train_op], 
                                   feed_dict={X: X_trn[start:end], 
                                              Y: Y_trn[start:end], 
                                              p_keep_conv: 1.0, 
                                              p_keep_hidden: 0.8})
            
        trn_loss_list.append(trn_loss / batch_size)
        test_loss, test_acc = sess.run([loss, accuracy], 
                                       feed_dict={X: X_test, 
                                                  Y: Y_test, 
                                                  p_keep_conv: 1.0, 
                                                  p_keep_hidden: 1.0})
        test_loss_list.append(test_loss)
        test_accuracy_list.append(test_acc)
        print("epoch: {} test accuracy {:0.4f}".format(i, test_acc))

epoch: 0 test accuracy 0.2551
epoch: 1 test accuracy 0.1693
epoch: 2 test accuracy 0.1032
epoch: 3 test accuracy 0.1032
epoch: 4 test accuracy 0.9389
epoch: 5 test accuracy 0.9590


KeyboardInterrupt: 