# Auxiliary.py

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf


def mkdir(path):
    path = path.strip()
    path = path.rstrip("\\")
    isExists = os.path.exists(path)
    # 判断结果
    if not isExists:
        # 如果不存在则创建目录
        # 创建目录操作函数
        os.makedirs(path) 
        print(path+' 创建成功')
        return True
    else:
        # 如果目录存在则不创建，并提示目录已存在
        #print(path+' 目录已存在')
        return False


# 检查GPU
def GPU():
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
        except RuntimeError as e:
            print(e)
    print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

# model.py

In [2]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow import keras
import tensorflow as tf
import numpy as np


# 模型构建
class VAECNN(keras.Model):
    def __init__(self, z_dim, L, opt, the_type=0):
        super(VAECNN, self).__init__()
        # 编码器
        self.en1 = layers.Conv2D(6, (5, 5), activation='relu', input_shape=(28, 28, 1))
        self.en2 = layers.MaxPooling2D((2, 2))
        self.en3 = layers.Conv2D(16, (5, 5), activation='relu')
        self.en4 = layers.MaxPooling2D((2, 2))
        self.en5 = layers.Conv2D(120, (4, 4), activation='relu')
        self.en6 = layers.Flatten()
        self.en7 = layers.Dense(84, activation='relu')
        # 均值
        self.en8 = layers.Dense(z_dim)
        # 方差
        self.en9 = layers.Dense(z_dim)

        # 解码器
        self.de1 = layers.Dense(84)
        self.de2 = layers.Dense(120)
        self.de3 = layers.Conv2DTranspose(16, (4, 4), activation='relu')
        self.de4 = layers.UpSampling2D((2, 2))
        self.de5 = layers.Conv2DTranspose(6, (5, 5), activation='relu')
        self.de6 = layers.UpSampling2D((2, 2))
        self.de7 = layers.Conv2DTranspose(1, (5, 5))

        self.L = L
        self.opt = opt
        self.type = the_type

    def encoder(self, x):
        # 编码器
        x = self.en1(x)
        x = self.en2(x)
        x = self.en3(x)
        x = self.en4(x)
        x = self.en5(x)
        x = self.en6(x)
        h = self.en7(x)
        # 获取均值
        mu = self.en8(h)
        # 获取方差
        log_var = self.en9(h)

        return mu, log_var

    def decoder(self, z):
        # 解码器
        z = self.de1(z)
        z = self.de2(z)
        z = tf.reshape(z, (-1, 120))
        z = tf.expand_dims(z, 1)
        z = tf.expand_dims(z, 1)
        z = self.de3(z)
        z = self.de4(z)
        z = self.de5(z)
        z = self.de6(z)
        out = self.de7(z)
        return out

    def call(self, inputs, training=None, mask=None):
        # 模型拼装
        mu, log_var = self.encoder(inputs)
        log_var_ex = tf.tile(tf.expand_dims(log_var, 0), (self.L, 1, 1))
        if self.type == 0:
            eps = tf.random.normal(log_var_ex.shape)
        else:
            eps = tf.cast(np.random.standard_cauchy(log_var_ex.shape), dtype=tf.float32)
        std = tf.exp(log_var_ex * 0.5)
        mu_ex = tf.tile(tf.expand_dims(mu, 0), (self.L, 1, 1))
        z = mu_ex + std * eps
        z = tf.reshape(z, (-1, z.shape[2]))
        x_hat = self.decoder(z)
        return x_hat, mu, log_var


# 模型构建
class VAEDNN(keras.Model):
    def __init__(self, z_dim, L, opt, the_type=0):
        super(VAEDNN, self).__init__()
        # 编码器
        self.fc1 = layers.Dense(128)
        self.fc2 = layers.Dense(z_dim)      # 获得均值
        self.fc3 = layers.Dense(z_dim)      # 获得均值

        # 解码器
        self.fc4 = layers.Dense(128)
        self.fc5 = layers.Dense(784)
        self.L = L
        self.opt = opt
        self.type = the_type

    def encoder(self, x):
        # 编码器
        h = tf.nn.relu(self.fc1(x))
        mu = self.fc2(h)
        log_var = self.fc3(h)
        return mu, log_var

    def decoder(self, z):
        # 解码器
        out = tf.nn.relu(self.fc4(z))
        out = self.fc5(out)
        return out

    def call(self, inputs, training=None, mask=None):
        # 模型拼装
        mu, log_var = self.encoder(inputs)
        log_var_ex = tf.tile(tf.expand_dims(log_var, 0), (self.L, 1, 1))
        if self.type == 0:
            eps = tf.random.normal(log_var_ex.shape)
        else:
            eps = tf.cast(np.random.standard_cauchy(log_var_ex.shape), dtype=tf.float32)
        std = tf.exp(log_var_ex * 0.5)
        mu_ex = tf.tile(tf.expand_dims(mu, 0), (self.L, 1, 1))
        z = mu_ex + std * eps
        z = tf.reshape(z, (-1, z.shape[2]))
        x_hat = self.decoder(z)
        return x_hat, mu, log_var


# 误差数据
class History():
    def __init__(self):
        self.kl_divs = []
        self.the_loss = []
        self.the_rec_loss = []
        self.kl_divs_tests = []
        self.the_loss_tests = []
        self.the_rec_loss_tests = []

    def update(self, kl_div, rec_loss, my_loss, kl_div_test, rec_loss_test, my_loss_test):
        self.kl_divs.append(kl_div.numpy())
        self.the_loss.append(my_loss.numpy())
        self.the_rec_loss.append(rec_loss.numpy())
        self.kl_divs_tests.append(kl_div_test.numpy())
        self.the_loss_tests.append(my_loss_test.numpy())
        self.the_rec_loss_tests.append(rec_loss_test.numpy())

# CreatePicture.py

In [3]:
from Auxiliary import mkdir
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np


# 图像存储
def save_image(data, name, path):
    mkdir(path)
    save_img_path = '{}{}.jpg'.format(path, name)
    new_img = np.zeros((280, 280))
    for index, each_img in enumerate(data[:100]):
        row_start = int(index/10) * 28
        col_start = (index % 10)*28
        new_img[row_start:row_start+28, col_start:col_start+28] = each_img

    plt.imsave(save_img_path, new_img, cmap='Greys_r')


# 编码解码测试图像
def encode_decode_test(x, x_hat, epoch, path):
    save_image(x, '{}_label'.format(epoch), path)
    x_hat = tf.sigmoid(x_hat)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    save_image(x_hat, '{}_pre'.format(epoch), path)


# 添加噪音测试
def noise_test(x, epoch, path, my_model):
    x = np.array(x)
    save_image(x, '{}_origin'.format(epoch), path)
    the_noise = tf.cast(np.random.binomial(1, 0.1, x.shape), dtype=tf.float32)
    x = tf.maximum(x, the_noise)
    save_image(x, '{}_noise'.format(epoch), path)
    x = tf.reshape(x, [-1, 28, 28, 1])
    x_hat, _, _ = my_model(x)
    x_hat = tf.sigmoid(x_hat)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    save_image(x_hat, '{}_recon'.format(epoch), path)


# 添加噪音测试
def noise_test2(x, epoch, path, my_model):
    x = np.array(x)
    save_image(x, '{}_origin'.format(epoch), path)
    the_noise = tf.cast(np.random.binomial(1, 0.1, x.shape), dtype=tf.float32)
    x = tf.maximum(x, the_noise)
    save_image(x, '{}_noise'.format(epoch), path)
    x = tf.reshape(x, [-1, 784])
    x_hat, _, _ = my_model(x)
    x_hat = tf.sigmoid(x_hat)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    save_image(x_hat, '{}_recon'.format(epoch), path)


# 生成新的图像
def generate_picture(epoch, path, batchsz, z_dim, my_model, the_type):
    if the_type == 0:
        z = tf.random.normal((batchsz, z_dim))
    else:
        z = tf.cast(np.random.standard_cauchy((batchsz, z_dim)), dtype=tf.float32)
    logits = my_model.decoder(z)
    x_hat = tf.sigmoid(logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    save_image(x_hat, '{}_random'.format(epoch), path)


# 可视化
def graph(history, path, z_dim, batchsz, learn_rate, theta, label):
    plt.figure(figsize=(15, 5))
    plt.subplot(131)
    plt.plot(history.kl_divs, label='train')
    plt.plot(history.kl_divs_tests, label='test')
    plt.ylabel('KL divergence')
    plt.legend()
    plt.subplot(132)
    plt.plot(history.the_rec_loss, label='train')
    plt.plot(history.the_rec_loss_tests, label='test')
    plt.ylabel('reconstruction loss')
    plt.legend()
    plt.subplot(133)
    plt.plot(history.the_loss, label='train')
    plt.plot(history.the_loss_tests, label='test')
    plt.ylabel('loss')
    plt.legend()
    plt.savefig('{}Z{} B{} L{} T{} {}.png'.format(path, z_dim, batchsz, learn_rate, theta, label))


# RunCNN.py

In [4]:
from model import History
import tensorflow as tf
import pandas as pd
from CreatePicture import encode_decode_test, generate_picture, graph, noise_test
import random


# 损失函数
def process(x, L, my_model, theta):
    x_hat, mu, log_var = my_model(x)
    x_hat = tf.reshape(x_hat, (L, -1, x_hat.shape[1], x_hat.shape[2], x_hat.shape[3]))
    # 平方差
    #rec_loss = tf.square(tf.tile(tf.expand_dims(x, 0), (L, 1, 1, 1, 1))-x_hat)
    # 交叉熵
    rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=tf.tile(tf.expand_dims(x, 0), (L, 1, 1, 1, 1)), logits=x_hat)
    rec_loss = tf.reduce_mean(rec_loss, 0)
    rec_loss = tf.reduce_sum(rec_loss)/x.shape[0]
    # KL散度
    if my_model.type == 0:
        kl_div = -0.5 * (log_var + 1 - mu ** 2 - tf.exp(log_var))
    else:
        kl_div = -0.5 * log_var - 2 * tf.math.log(2.) + tf.math.log((mu**2 - tf.exp(log_var) + 1)**2 + 4*tf.exp(log_var) * mu**2) - tf.math.log((tf.exp(0.5*log_var)-1)**2 + mu**2)
    kl_div = tf.reduce_sum(kl_div) / x.shape[0]

    # 两个误差结合
    my_loss = rec_loss + theta * kl_div
    return kl_div, rec_loss, my_loss, x_hat


# 运行模型
def run_model(epochs, train_db, x_test, path, batchsz, z_dim, my_model, theta, learn_rate, label):
    history = History()
    for epoch in range(epochs):
        epoch += 1
        for x in train_db:
            x = tf.reshape(x, [-1, 28, 28, 1])
            with tf.GradientTape() as tape:
                kl_div, rec_loss, my_loss, _ = process(x, my_model.L, my_model, theta)
            grads = tape.gradient(my_loss, my_model.trainable_variables)
            my_model.opt.apply_gradients(zip(grads, my_model.trainable_variables))
        if epoch % 5 == 0:
            # 测试模型
            x_0 = random.sample(list(x_test), 100)
            x = tf.reshape(x_0, [-1, 28, 28, 1])
            kl_div_test, rec_loss_test, my_loss_test, x_hat_logits = process(x, 1, my_model, theta)
            print("epoch:{:<3d} (train) KL divergence:{:.2f} reconstruction loss:{:.2f} loss:{:.2f}".format(epoch, kl_div, rec_loss, my_loss))
            print("          (test)  KL divergence:{:.2f} reconstruction loss:{:.2f} loss:{:.2f}".format(kl_div_test, rec_loss_test, my_loss_test))
            history.update(kl_div, rec_loss, my_loss, kl_div_test, rec_loss_test, my_loss_test)
            if epoch % 5 == 0:
                # 编码解码图片
                encode_decode_test(x_0, x_hat_logits, epoch, path)
                # 噪音图片测试
                noise_test(x_0, epoch, path, my_model)
                # 用解码器生成图片
                generate_picture(epoch, path, batchsz, z_dim, my_model, my_model.type)
        if epoch % 100 == 0:
            # 可视化
            graph(history, path, z_dim, batchsz, learn_rate, theta, label)
            # 存储数据
            frame = pd.DataFrame({'kl_divs': history.kl_divs, 'the_loss': history.the_loss, 'the_rec_loss': history.the_rec_loss, 'kl_divs_tests': history.kl_divs_tests, 'the_loss_tests': history.the_loss_tests, 'the_rec_loss_tests': history.the_rec_loss_tests})
            frame.to_csv('{}Z{} B{} L{} T{} CNN Cauchy.csv'.format(path, z_dim, batchsz, learn_rate, theta), sep=',')
    return history


# VAE tensorflow CNN.py

In [None]:
from model import VAECNN
from RunCNN import run_model
import numpy as np
from Auxiliary import GPU
import tensorflow as tf
from pathlib import Path


# 加载数据
def data(data):
    (x_train, _), (x_test, _) = data.load_data()
    x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
    train_db = tf.data.Dataset.from_tensor_slices(x_train).shuffle(batchsz * 5).batch(batchsz)
    return train_db, x_test


label = Path(__file__).name
path = './data/{}/27C/'.format(label)
# 超参数
z_dim = 10
batchsz = 512
learn_rate = 1e-4
theta = 1
epochs = 500
L = 1

GPU()
mnist = tf.keras.datasets.mnist
train_db, x_test = data(mnist)
my_model = VAECNN(z_dim, L, opt=tf.optimizers.Adam(learn_rate))
history = run_model(epochs, train_db, x_test, path, batchsz, z_dim, my_model, theta, learn_rate, label)



# VAE tensorflow DNN.py

In [None]:
from model import VAEDNN
from RunDNN import run_model
import numpy as np
from Auxiliary import GPU
import tensorflow as tf
from pathlib import Path


# 加载数据
def data(data):
    (x_train, _), (x_test, _) = data.load_data()
    x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
    train_db = tf.data.Dataset.from_tensor_slices(x_train).shuffle(batchsz * 5).batch(batchsz)
    return train_db, x_test


label = Path(__file__).name
path = './data/{}/6/'.format(label)
# 超参数
z_dim = 10
batchsz = 512
learn_rate = 1e-4
theta = 1
epochs = 500
L = 1

GPU()
mnist = tf.keras.datasets.mnist
train_db, x_test = data(mnist)
my_model = VAEDNN(z_dim, L, opt=tf.optimizers.Adam(learn_rate))
run_model(epochs, train_db, x_test, path, batchsz, z_dim, my_model, theta, learn_rate, label)
