Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
210 lines (169 sloc) 6.44 KB
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Wed Nov 1 20:06:33 2017
@author: fangping_lu
"""
from keras.layers import Input, Dense, Dropout,Lambda,Layer
from keras.layers.advanced_activations import LeakyReLU
from keras.models import Model, load_model
from keras import regularizers
from keras import metrics
from keras import optimizers
from keras.callbacks import ModelCheckpoint, TensorBoard
from keras import backend as K
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
import random
#%% divide data into training & testing.
df=np.loadtxt(open("binary_matrix.csv", "rb"), delimiter=",", skiprows=0)
#np.random.shuffle(df)
#nrow=df.shape[0]
#ratio=0.75
#train_size=int(nrow*ratio)
#x_train,x_test=df[:train_size,:],df[train_size:,:]
#np.savetxt("x_train_v1.csv", x_train, delimiter=",")
#np.savetxt("x_test_v1.csv", x_test, delimiter=",")
#%% import training & testing, once fixed.
df=np.loadtxt(open("pca_binary.csv", "rb"), delimiter=",", skiprows=0)
x_train=np.loadtxt(open("x_train_v1.csv", "rb"), delimiter=",", skiprows=0)
x_test=np.loadtxt(open("x_test_v1.csv", "rb"), delimiter=",", skiprows=0)
print x_train.shape
#%% Autoencoder model setup
input_dim=x_train.shape[1]
n_epoch=20
batch=5
model_name='ae'
encoding_dim=int(input_dim/100)
learning_rate=0.001
input_layer=Input(shape=(input_dim,))
encoded1=Dense(encoding_dim, activation='relu',activity_regularizer=regularizers.l1(10e-6))(input_layer)
encoded1=LeakyReLU(alpha=0.1)(input_layer)
encoded1=Dropout(0.1, input_shape=(encoding_dim,))(encoded1)
decoded2=Dense(input_dim,activation='sigmoid')(encoded1)
adam=optimizers.Adam(lr=learning_rate, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
#Build autoencoder
autoencoder=Model(inputs=input_layer, outputs=decoded2)
autoencoder.compile(optimizer=adam, loss='binary_crossentropy')
checkpointer=ModelCheckpoint(filepath=(model_name+'.h5'),verbose=0, save_best_only=True)
tensorboard=TensorBoard(log_dir='/tmp/autoencoder', histogram_freq=0, write_graph=True,write_images=True)
history=autoencoder.fit(x_train, x_train,
epochs=n_epoch,
batch_size=batch,
shuffle=True,
validation_data=(x_test,x_test),
verbose=1,
callbacks=[checkpointer, tensorboard]).history
autoencoder=load_model((model_name+'.h5'))
test_output=autoencoder.predict(x_test)
np.savetxt((model_name+".csv"), test_output, delimiter=",")
#%% Plot loss
plt.plot(history['loss'])
plt.plot(history['val_loss'])
plt.title('Training & Validation Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Training Data', 'Test Data'], loc='upper right');
#%% VAE model setup.
#parameters
n_epoch=20
batch_size = 5
original_dim = 2304+288
n_h1=300
n_h2=25
latent_dim=8
epsilon_std = 1.0
dropout_value=0.1
alpha_value=0.05
learning_rate=0.001
model_name='vae'
x = Input(shape=(original_dim,))
h1 = Dense(n_h1, activation='relu',activity_regularizer=regularizers.l1(10e-6))(x)
h1=Dropout(dropout_value, input_shape=((n_h1),))(h1)
h1=LeakyReLU(alpha=alpha_value)(h1)
h2 = Dense(n_h2, activation='relu',activity_regularizer=regularizers.l1(10e-6))(h1)
h2=Dropout(dropout_value, input_shape=((n_h2),))(h1)
z_mean = Dense(latent_dim)(h2)
z_log_var = Dense(latent_dim)(h2)
def sampling(args):
z_mean, z_log_var = args
epsilon = K.random_normal(shape=(K.shape(z_mean)[0],latent_dim),
mean=0.,
stddev=epsilon_std)
return z_mean + K.exp(z_log_var/2) * epsilon
z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
d3= Dense(n_h2, activation='relu')
d3_1=Dropout(dropout_value, input_shape=((n_h2),))
d3_2=LeakyReLU(alpha=alpha_value)
d4= Dense(n_h1, activation='relu')
d4_1=Dropout(dropout_value, input_shape=((n_h2),))
d4_2=LeakyReLU(alpha=alpha_value)
d5=Dense(original_dim, activation='sigmoid')
h_decoded3=d3(z)
h_decoded4=d4(h_decoded3)
x_decoded_mean=d5(h_decoded4)
# encoder, from inputs to latent space
encoder = Model(x, z_mean)
#specify optimizer
adam=optimizers.Adam(lr=learning_rate, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
#define loss
# Custom loss layer
class CustomVariationalLayer(Layer):
def __init__(self, **kwargs):
self.is_placeholder = True
super(CustomVariationalLayer, self).__init__(**kwargs)
def vae_loss(self, x, x_decoded_mean):
xent_loss = original_dim * metrics.binary_crossentropy(x, x_decoded_mean)
kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
return K.mean(xent_loss + kl_loss)
def call(self, inputs):
x = inputs[0]
x_decoded_mean = inputs[1]
loss = self.vae_loss(x, x_decoded_mean)
self.add_loss(loss, inputs=inputs)
# We won't actually use the output.
return x
y = CustomVariationalLayer()([x, x_decoded_mean])
# end-to-end autoencoder
vae = Model(x, y)
vae.compile(optimizer=adam, loss=None)
checkpointer=ModelCheckpoint(filepath=(model_name+'.hdf5'),verbose=0, save_best_only=True)
tensorboard=TensorBoard(log_dir='/tmp/autoencoder', histogram_freq=0, write_graph=True,write_images=True)
history=vae.fit(x_train,
shuffle=True,
epochs=n_epoch,
batch_size=batch_size,
validation_data=(x_test, None),
callbacks=[checkpointer, tensorboard]).history
#%% Plot loss
plt.plot(history['loss'])
plt.plot(history['val_loss'])
plt.title('Training & Validation Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Training Data', 'Test Data'], loc='upper right');
#%% Generate sequences from the learned distribution.
decoder_input = Input(shape=(latent_dim,))
h3_decoded = d3(decoder_input)
h3_decoded = d3_1(h3_decoded)
h3_decoded = d3_2(h3_decoded)
h4_decoded=d4(h3_decoded)
h4_decoded=d4_1(h4_decoded)
h4_decoded=d4_2(h4_decoded)
_x_decoded_mean = d5(h4_decoded)
generator = Model(decoder_input, _x_decoded_mean)
n = x_test.shape[0]
x_test_generated=np.zeros((n, original_dim))
param_learned=np.zeros((n,latent_dim))
rand=np.zeros((n,latent_dim))
for i in range(n):
for j in range(latent_dim):
rand[i,j]=random.uniform(0,1)
for i in range(latent_dim):
param_learned[:,i]=norm.ppf(rand[:,i])
for idx in range(n):
z_sample = param_learned[idx,:].reshape(1,param_learned.shape[1])
x_decoded = generator.predict(z_sample)
x_test_generated[idx,:]=x_decoded
np.savetxt((model_name+".csv"), x_test_generated, delimiter=",")