In [1]:
import numpy as np
import sklearn.preprocessing as prep
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

### 1、工具类

In [2]:
#参数初始化器
def xavier_init(fan_in, fan_out, constant = 1):
    low = -constant * np.sqrt(6.0 / (fan_in + fan_out))
    high = constant * np.sqrt(6.0 / (fan_in + fan_out))
    return tf.random_uniform((fan_in, fan_out),
                             minval = low, maxval = high,
                             dtype = tf.float32)

### 2、编码模型类*

In [3]:
class AdditiveGaussianNoiseAutoencoder(object):
    def __init__(self, n_input, n_hidden, transfer_function = tf.nn.softplus, optimizer = tf.train.AdamOptimizer(),
                 scale = 0.1):
        self.n_input = n_input    #输入节点数
        self.n_hidden = n_hidden  #隐藏层节点数
        self.transfer = transfer_function  #激活函数
        self.scale = tf.placeholder(tf.float32)   
        self.training_scale = scale   #高斯噪声系数
        network_weights = self._initialize_weights()   #参数初始化（使用到之前的工具类）
        self.weights = network_weights

        # model
        self.x = tf.placeholder(tf.float32, [None, self.n_input])
        #计算隐藏层
        self.hidden = self.transfer(tf.add(tf.matmul(self.x + scale * tf.random_normal((n_input,)),
                self.weights['w1']),
                self.weights['b1']))
        #计算输出层，因为自编码器要求输出与输入一样，所以不是output而是reconstruction
        self.reconstruction = tf.add(tf.matmul(self.hidden, self.weights['w2']), self.weights['b2'])

        # cost，输入与输出的均方误差
        self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(self.reconstruction, self.x), 2.0))
        self.optimizer = optimizer.minimize(self.cost)

        #创建类的时候就初始化参数了
        init = tf.global_variables_initializer()
        self.sess = tf.Session()
        self.sess.run(init)

    #初始化参数的方法
    def _initialize_weights(self):
        all_weights = dict()
        all_weights['w1'] = tf.Variable(xavier_init(self.n_input, self.n_hidden))
        all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden], dtype = tf.float32))
        all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden, self.n_input], dtype = tf.float32))
        all_weights['b2'] = tf.Variable(tf.zeros([self.n_input], dtype = tf.float32))
        return all_weights
    
    #实际开始运行优化的函数
    def partial_fit(self, X):
        cost, opt = self.sess.run((self.cost, self.optimizer), feed_dict = {self.x: X,
                                                                            self.scale: self.training_scale
                                                                            })
        return cost

    #不优化，只输出cost值
    def calc_total_cost(self, X):
        return self.sess.run(self.cost, feed_dict = {self.x: X,
                                                     self.scale: self.training_scale
                                                     })

    #运行子图，获取隐藏层
    def transform(self, X):
        return self.sess.run(self.hidden, feed_dict = {self.x: X,
                                                       self.scale: self.training_scale
                                                       })

    #运行子图，输入隐藏层获取到的输出层（因为输出层只需要hidden）
    def generate(self, hidden = None):
        if hidden is None:
            hidden = np.random.normal(size = self.weights["b1"])
        return self.sess.run(self.reconstruction, feed_dict = {self.hidden: hidden})

    #完整运行graph，获取输出值，=transform+generate
    def reconstruct(self, X):
        return self.sess.run(self.reconstruction, feed_dict = {self.x: X,
                                                               self.scale: self.training_scale
                                                               })
    #获取隐藏层的w参数
    def getWeights(self):
        return self.sess.run(self.weights['w1'])
    
    #获取隐藏层的b参数
    def getBiases(self):
        return self.sess.run(self.weights['b1'])
        

### 3、读取mnist数据

In [4]:
#换成你自己的路径
mnist = input_data.read_data_sets('../datasets/MNIST_data', one_hot = True)

Extracting ../datasets/MNIST_data/train-images-idx3-ubyte.gz
Extracting ../datasets/MNIST_data/train-labels-idx1-ubyte.gz
Extracting ../datasets/MNIST_data/t10k-images-idx3-ubyte.gz
Extracting ../datasets/MNIST_data/t10k-labels-idx1-ubyte.gz


### 4、又有俩工具类

In [6]:
def standard_scale(X_train, X_test):
    #使用sklearn的函数进行预处理标准化
    preprocessor = prep.StandardScaler().fit(X_train)
    X_train = preprocessor.transform(X_train)
    X_test = preprocessor.transform(X_test)
    return X_train, X_test

#从数据集中随机获取一个batch集
def get_random_block_from_data(data, batch_size):
    start_index = np.random.randint(0, len(data) - batch_size)
    return data[start_index:(start_index + batch_size)]

### 5、终于开始训练、验证数据了

In [7]:
#数据标准化,实际上就是把图片的三维压缩成了一维，然后归一化
X_train, X_test = standard_scale(mnist.train.images, mnist.test.images)

In [16]:
print type(X_train)
print X_train.shape
print X_train[0][:50]

<type 'numpy.ndarray'>
(55000, 784)
[ 0.          0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.         -0.00461458
 -0.00601082 -0.00426409 -0.00426409  0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.         -0.00426406
 -0.00491916 -0.00919097 -0.01148822 -0.01441607 -0.01938349 -0.02483363
 -0.02953284 -0.03112361 -0.03167421 -0.03243978 -0.0299217  -0.03101258
 -0.02862023 -0.02337851 -0.01906476 -0.01635208 -0.01011273]


In [9]:
#定义一些常量
NUM = int(mnist.train.num_examples)
EPOCHES = 20  
BATCH_size = 128
DISPLAY_step = 1  #每隔多少轮显示一次cost

In [10]:
#创建模型实例，构造方法中已初始化好了变量
autoencoder = AdditiveGaussianNoiseAutoencoder(n_input = 784,
                                               n_hidden = 200,
                                               transfer_function = tf.nn.softplus,
                                               optimizer = tf.train.AdamOptimizer(learning_rate = 0.001),
                                               scale = 0.01)


In [12]:
# 开始迭代训练epoch轮
for epoch in range(EPOCHES):
    avg_cost = 0.
    # 每轮要训练多少个batch
    total_batch = int(NUM / BATCH_size)
    
    # Loop over all batches
    for i in range(total_batch):
        # 随机从训练集中获取一个batch
        batch_xs = get_random_block_from_data(X_train, BATCH_size)
        
        # Fit training using batch data
        # 训练这个batch
        cost = autoencoder.partial_fit(batch_xs)
        
        # Compute average loss
        avg_cost += cost / NUM * BATCH_size

    # Display logs per epoch step
    if epoch % DISPLAY_step == 0:
        print "Epoch:", '%04d' % (epoch + 1), "cost=", "{:.9f}".format(avg_cost)

print "Total cost: " + str(autoencoder.calc_total_cost(X_test))

Epoch: 0001 cost= 12225.142872727
Epoch: 0002 cost= 9826.498107386
Epoch: 0003 cost= 10504.222054545
Epoch: 0004 cost= 10482.087984091
Epoch: 0005 cost= 8570.788173295
Epoch: 0006 cost= 8531.979227273
Epoch: 0007 cost= 10063.344133523
Epoch: 0008 cost= 8457.045868750
Epoch: 0009 cost= 8833.006449432
Epoch: 0010 cost= 8322.703061364
Epoch: 0011 cost= 8506.528192614
Epoch: 0012 cost= 8027.122040909
Epoch: 0013 cost= 8373.752133523
Epoch: 0014 cost= 8313.309917614
Epoch: 0015 cost= 8298.272556250
Epoch: 0016 cost= 7705.873556818
Epoch: 0017 cost= 7880.898230114
Epoch: 0018 cost= 8159.548758523
Epoch: 0019 cost= 7532.065497727
Epoch: 0020 cost= 7927.166675568
Total cost: 697277.0


In [19]:
#获取训练完成后的权重
print autoencoder.getWeights()

[[ 0.09321236  0.09074684  0.05463809 ...,  0.00169099 -0.03547013
   0.06860419]
 [-0.01404193 -0.03381761  0.05111738 ...,  0.05779494  0.03069022
  -0.05018943]
 [ 0.1076509  -0.04194646 -0.08686957 ...,  0.0378673  -0.08989307
  -0.0800903 ]
 ..., 
 [-0.11409521 -0.04100924  0.04975993 ...,  0.06152834  0.05778983
  -0.03277344]
 [-0.026313    0.04970432  0.00394098 ...,  0.07307377 -0.09624624
   0.02843025]
 [ 0.02273075  0.04412144 -0.03265305 ...,  0.01932362  0.01018522
  -0.06205586]]
