# Reconstruction
## feature -> vae predict -> feature concat -> inverse melspectrogram -> wav

In [1]:
%matplotlib inline

In [2]:
import os
import keras
import pickle
import umap

import numpy as np
import matplotlib.pyplot as plt

from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.manifold import TSNE
from keras.layers import Dense, Input
from keras.layers import Conv2D, Flatten, Lambda
from keras.layers import Reshape, Conv2DTranspose
from keras.models import Model
from keras.losses import mse, binary_crossentropy
from keras.utils import plot_model
from keras import backend as K
from sklearn.metrics.pairwise import cosine_similarity
from seaborn import heatmap

Using TensorFlow backend.


In [3]:
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [4]:
# input_shape = (129, 48, 1)
# input_shape = (40, 48, 1)
input_shape = (128, 48, 1)
# intermediate_dim = 512
intermediate_dim = 128
latent_dim = 40
latent_dim = 20
# batch_size = 16
batch_size = 3
kernel_size = 6
kernel_size = 3
stride_size = 3
stride_size = 1
filters = 16
filters = 4
epochs = 50

In [5]:
# 모델의 아키텍쳐 정의하는 부분 중요
# 일단 컴파일은 되는데 나중에 graphviz 에러 수정할 필요있음 이 모델을 쓸거면은
def sampling(args):
    """Reparameterization trick by sampling fr an isotropic unit Gaussian.
    # Arguments
        args (tensor): mean and log of variance of Q(z|X)
    # Returns
        z (tensor): sampled latent vector
    """

    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    # by default, random_normal has mean=0 and std=1.0
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

# VAE model = encoder + decoder
# build encoder model
inputs = Input(shape=input_shape, name='encoder_input')
x = inputs
for i in range(2):
    filters *= 2
    x = Conv2D(filters=filters,
               kernel_size=kernel_size,
               activation='tanh',
               strides=stride_size,
               padding='valid')(x)

# shape info needed to build decoder model
shape = K.int_shape(x)

# generate latent vector Q(z|X)
x = Flatten()(x)
x = Dense(intermediate_dim, activation='tanh')(x)
z_mean = Dense(latent_dim, name='z_mean')(x)
z_log_var = Dense(latent_dim, name='z_log_var')(x)

# use reparameterization trick to push the sampling out as input
# note that "output_shape" isn't necessary with the TensorFlow backend
z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

# instantiate encoder model
encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
encoder.summary()
# plot_model(encoder, to_file='../data/vae_cnn_encoder.png', show_shapes=True)

# build decoder model
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
x = Dense(shape[1] * shape[2] * shape[3], activation='tanh')(latent_inputs)
x = Reshape((shape[1], shape[2], shape[3]))(x)

for i in range(2):
    x = Conv2DTranspose(filters=filters,
                        kernel_size=kernel_size,
                        activation='tanh',
                        strides=stride_size,
                        padding='valid')(x)
    filters //= 2

outputs = Conv2DTranspose(filters=1,
                          kernel_size=kernel_size,
                          activation='sigmoid',
                          padding='same',
                          name='decoder_output')(x)

# instantiate decoder model
decoder = Model(latent_inputs, outputs, name='decoder')
decoder.summary()
# plot_model(decoder, to_file='../data/vae_cnn_decoder.png', show_shapes=True)

# instantiate VAE model
outputs = decoder(encoder(inputs)[2])
vae = Model(inputs, outputs, name='vae')

reconstruction_loss = mse(K.flatten(inputs), K.flatten(outputs))

reconstruction_loss *= input_shape[0] * input_shape[1]
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.sum(kl_loss, axis=-1)
kl_loss *= -5e-4
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='rmsprop')
# 이부분 어차피 모델 시각화 파일 저장하는거라 필요할때 에러 처리하도록
# plot_model(vae, to_file='../data/vae_cnn.png', show_shapes=True)
vae.summary()

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
Model: "encoder"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      (None, 128, 48, 1)   0                                            
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 126, 46, 8)   80          encoder_input[0][0]              
__________________________________________________________________________________________________
conv2d_2 (Conv2D)               (None, 124, 44, 16)  1168        conv2d_1[0][0]                   
__________________________________________________________________________________________________
flatten_1 (Flatten)             (None, 87296)        0           conv2d_2[0][0]                   
___________

  'be expecting any data to be passed to {0}.'.format(name))


In [None]:
# 모델 아키텍쳐 맨날 했던부분
def sampling(args):
    """Reparameterization trick by sampling fr an isotropic unit Gaussian.
    # Arguments
        args (tensor): mean and log of variance of Q(z|X)
    # Returns
        z (tensor): sampled latent vector
    """

    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    # by default, random_normal has mean=0 and std=1.0
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

# VAE model = encoder + decoder
# build encoder model
inputs = Input(shape=input_shape, name='encoder_input')
x = inputs
for i in range(2):
    filters *= 2
    x = Conv2D(filters=filters,
               kernel_size=kernel_size,
               activation='tanh',
               strides=3,
               padding='valid')(x)

# shape info needed to build decoder model
shape = K.int_shape(x)

# generate latent vector Q(z|X)
x = Flatten()(x)
x = Dense(intermediate_dim, activation='tanh')(x)
z_mean = Dense(latent_dim, name='z_mean')(x)
z_log_var = Dense(latent_dim, name='z_log_var')(x)

# use reparameterization trick to push the sampling out as input
# note that "output_shape" isn't necessary with the TensorFlow backend
z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

# instantiate encoder model
encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
# plot_model(encoder, to_file='../data/vae_cnn_encoder.png', show_shapes=True)

# build decoder model
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
x = Dense(shape[1] * shape[2] * shape[3], activation='tanh')(latent_inputs)
x = Reshape((shape[1], shape[2], shape[3]))(x)

for i in range(2):
    x = Conv2DTranspose(filters=filters,
                        kernel_size=kernel_size,
                        activation='tanh',
                        strides=3,
                        padding='valid')(x)
    filters //= 2

outputs = Conv2DTranspose(filters=1,
                          kernel_size=kernel_size,
                          activation='sigmoid',
                          padding='same',
                          name='decoder_output')(x)

# instantiate decoder model
decoder = Model(latent_inputs, outputs, name='decoder')
# plot_model(decoder, to_file='../data/vae_cnn_decoder.png', show_shapes=True)

# instantiate VAE model
outputs = decoder(encoder(inputs)[2])
vae = Model(inputs, outputs, name='vae')
# plot_model(vae, to_file='../data/vae_cnn.png', show_shapes=True)

reconstruction_loss = mse(K.flatten(inputs), K.flatten(outputs))

reconstruction_loss *= input_shape[0] * input_shape[1]
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.sum(kl_loss, axis=-1)
kl_loss *= -5e-4
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='rmsprop')
vae.summary()

In [17]:
# load model weight
vae.load_weights('../son/model/99.h5')

In [7]:
# numpy pickle load error 해결
np_load_old = np.load

# modify the default parameters of np.load
np.load = lambda *a,**k: np_load_old(*a, allow_pickle=True, **k)

x_mean = pickle.load(open('../data/x_mean.pkl', 'rb'))
x_std = pickle.load(open('../data/x_std.pkl', 'rb'))

# 이부분 까지는 원래부터 계속 했던거 모델 설계, 웨이트, mean, std 로드

# feature 가져와서

In [18]:
import librosa
import librosa.display

feature_path = '../son/feature_train/'
recon_path = '../son/reconstruction'
for i, filename in enumerate(sorted(os.listdir(feature_path))):
    
    # 특정 파일만 확인
    # x = (np.load('../son/feature_train/NB10584578') - x_mean) / x_std
    x = (np.load(os.path.join(best_test, filename)) - x_mean) / x_std
    print(x.shape, filename)  
    
    # batch 사이즈만큼 학습했기 때문에 학습한 batch size만큼 잘라줘서 predict 해주기
    split_column = int(x.shape[0]/batch_size) * batch_size
    x = x[:split_column, :, :]
    
    # predict
    predict = vae.predict(x.reshape(x.shape + (1,)))
    print("predict", predict.shape)
    predict = predict.reshape(x.shape)
    
    # feature concat해서 inverse한다음에 wav파일로 저장 
    last = librosa.feature.inverse.mel_to_audio(np.hstack(predict))
    librosa.output.write_wav(os.path.join(recon_path, filename), last, sr=22050)                    
    print("-------")

(7, 128, 48) 163-122947-0000
predict (6, 128, 48, 1)
-------


# ==== 밑에는 이전코드 시각화 하고... 하는거 일단 패스 ====

In [None]:
# best test는 또 뭐지
# best_test = '/home/ds/DataScience/Datasets/LibriSpeech/VAELibriSpeech/best_test'
best_test = '../son/feature_test/'
all_x = []
all_x_pred = []
all_y = []

import librosa
import librosa.display
# best_test가먼데에에에 결국 따로 파일 만들고 테스트 해봐야하나
for i, filename in enumerate(sorted(os.listdir(best_test))):
    
#     x, sample_rate = librosa.load(best_test + filename, sr=16000
#     x = (x - x_mean) / x_std
    
    x = (np.load(os.path.join(best_test, filename)) - x_mean) / x_std
    print(x.shape)
    x_ing = x[:len(x) - (len(x) % 48)] # 48 쉐이프 맞춰주기위해
    x_ing = np.reshape(x_ing, (-1, 48))
    print(x_ing.shape)
    x_ing = x_ing[:40, :]
    print(x_ing.shape)
        
    # x가 전체 데이터고 ing이 하나만 딱 뽑는 거인듯!!! 정신차려~~
#     x_ing = x[np.random.choice(x.shape[0]), :, :]

    # 전체 x랑 y 저장 y는 categorical 만들어주기 위해 저렇게 한건가
    all_x += [x]
    all_y += [i]*x.shape[0]

    
    plt.figure(figsize=(3, 7))
    # 랜덤 셔플한거의 dim2 쉐이프랑 이미지 보여주기
#     print(x_ing.shape[1]) # range(0, 48) shape[1] = 48 shape[0] = 
    
    plt.pcolormesh(range(x_ing.shape[1]),
                   range(x_ing.shape[0]),
                   10*np.log10(x_ing))
    plt.savefig('../data/{}_true.png'.format(filename))
    plt.show()
    
    # predict부분 reshape를 해서 predict한다음 다시 원래 위치로 컴백
    x_pred = vae.predict(x_ing.reshape((1,
                                        x_ing.shape[0],
                                        x_ing.shape[1], 1))).reshape((x_ing.shape[0],
                                                                      x_ing.shape[1]))
    # 예측한 것 = reconstruction 그림으로 보여주기
    print(x_pred)
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(librosa.power_to_db(x_pred, ref=np.max), y_axis='mel', x_axis='time')
    plt.colorbar(format='%+2.0f dB')
    plt.title('Reconstruct Mel-Spectrogram')
    plt.tight_layout()
    plt.savefig('Mel-Spectrogram example.png')
    plt.show()

#     all_x_pred += [x_pred]
#     plt.figure(figsize=(3, 7))
#     plt.pcolormesh(range(x_pred.shape[1]),
#                    range(x_pred.shape[0]),
#                    10*np.log10(x_pred))
#     plt.savefig('../data/{}_reconstr.png'.format(filename))
#     plt.show()
    
# 전부다 합치기...
all_x = np.vstack(all_x)
all_y = np.stack(all_y)

In [None]:
# 일단 인풋데이터의 형식이 (129, 48, 1)
# 이건 피쳐임
# value
# 
all_x.shape

# === 여기는 각 화자가 비슷한 벡터로 임베딩 되었는지 확인하는 부분 ===
# 다른사람 목소리 들어오면 여기다 해주면 될듯!

In [None]:
# 이 부분이 테스트 할려고 각 사람의 아이디를 가져와서 하는 부분인가봐 
all_x = []
all_y = []

# speaker_ids = ["1089", "1188", "121", "1221", "1284", "1320", "1580",
#                "1995", "2094", "2300", "237", "260", "2830", "2961",]
# other_ids = ["3570", "3575", "3729", "4077", "4446", "4507", "4970",
#                "4992", "5105", "5142", "5639", "5683", "61", "672",
#                "6829", "6930", "7021", "7127", "7176", "7729", "8224",
#                "8230", "8455", "8463", "8555", "908"]

speaker_ids = ['1', '19', '118', '32']


# 피쳐 경로
# feats_path = '/home/ds/DataScience/Datasets/LibriSpeech/VAELibriSpeech/test-clean-wav/'
feats_path = '../son/feature_test/'

# id 별로 encode하기 
for filename in sorted(os.listdir(feats_path)):
    cur_speaker_id = filename.split('-')[0]
    if cur_speaker_id == 'NB':
        cur_speaker_id = '1'
    all_y += [cur_speaker_id]
    x_file = np.load(os.path.join(feats_path, filename))
    x_file = (x_file - x_mean) / x_std
    all_x_encoded = encoder.predict(x_file.reshape(x_file.shape + (1,)))[2]
    all_x += [np.max(all_x_encoded, axis=0)]
all_x = np.stack(all_x)
all_y = np.stack(all_y)

In [None]:
all_y

In [None]:
all_x.shape

In [None]:
decomposition = PCA(n_components=2)

In [None]:
# iterator로 만들기
ids2labels = {speaker_id: i for i, speaker_id in enumerate(speaker_ids)}

In [None]:
x_embedded.shape, all_y.shape

In [None]:
# Translate into the latent space

# encode된 latent vector를 차원축소 해서 뿌려보기
# id 대부분이 겹쳐 있는것을 볼 수 있음
x_embedded = decomposition.fit_transform(all_x)
fig, ax = plt.subplots(figsize=(7, 7))
ax.scatter(x_embedded[:, 0], x_embedded[:, 1], c=[int(ids2labels[speaker_id])*30 for speaker_id in all_y])

# annotate!
for i, txt in enumerate(all_y):
    ax.annotate(txt, (x_embedded[i, 0], x_embedded[i, 1]))
#plt.colorbar()
plt.savefig('../data/speakers_plot.png')
plt.show()

In [None]:
# 이 부분운 단어들 가지고 하는거인듯 위에꺼랑 다른 거인 듯
all_x = []
all_y = []
feats_path = '/home/ds/DataScience/Datasets/LibriSpeech/VAELibriSpeech/test-words-feats/'
for filename in sorted(os.listdir(feats_path)):
    x_file = np.load(os.path.join(feats_path, filename))
    x_file = (x_file - x_mean) / x_std
    all_x_encoded = encoder.predict(x_file.reshape(x_file.shape + (1,)))[2]
    all_x += [np.max(all_x_encoded, axis=0)]
    all_y += [filename]
all_x = np.stack(all_x)
all_y = np.stack(all_y)

In [None]:
decomposition = PCA(n_components=2)

In [None]:
# Translate into the latent space
x_embedded = decomposition.fit_transform(all_x)
fig, ax = plt.subplots(figsize=(7, 7))
ax.scatter(x_embedded[:, 0], x_embedded[:, 1])

for i, txt in enumerate(all_y):
    ax.annotate(txt, (x_embedded[i, 0], x_embedded[i, 1]))
#plt.colorbar()
plt.savefig('../data/words_plot.png')
plt.show()

In [None]:
x_embedded.shape