<a href="https://colab.research.google.com/github/kimdonggyu2008/audio_synthesis/blob/main/autoencoder_VAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
DATA_FOLDER="/content/drive/MyDrive/오디오_합성/"

In [None]:
cd /content/drive/MyDrive/오디오_합성/

/content/drive/MyDrive/오디오_합성


In [None]:
import os
import pickle

from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, \
    Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
import numpy as np
import tensorflow as tf

tf.compat.v1.disable_eager_execution() #즉시실행 off, 컴파일, 트레인 함수를 직접 생성하기 위함

In [None]:

class VAE: #인코더 선언
  def __init__(self,
               input_shape,
               conv_filters,
               conv_kernels,
               conv_strides,
               latent_space_dim):
    self.input_shape=input_shape #인코더의 필터 갯수를 차례대로 선언, [28, 28, 1]
    self.conv_filters=conv_filters #각 레이어의 컨볼루션 필터 크기, [2, 4, 8] = 2*2,4*4,8*8
    self.conv_kernels=conv_kernels # 각 레이어의 커널(필터) 크기[3, 5, 3] = 3*3, 5*5, 3*3
    self.conv_strides=conv_strides #컨볼루션에 적용된 필터의 이동거리 [1, 2, 2]
    self.latent_space_dim=latent_space_dim #특징 공간 차원 수, 2
    self.reconstruction_loss_weight=1000

    self.encoder=None #인코더는 최초 입력값의 특징을 추출함
    self.decoder=None #디코더는 추출된 특징을 활용해서 최초 입력값과 비슷한 결과를 출력함
    self.model=None #모델의 생성
    #기본 인코더, 디코더, 모델 비활성화

    self._num_conv_layers=len(conv_filters)#레이어 갯수
    self._shape_before_bottleneck=None
    self._model_input=None

    self._num_conv_layers=len(conv_filters)
    self._shape_before_bottleneck=None
    self._model_input=None

    self._build()

  def summary(self):
    self.encoder.summary()
    self.decoder.summary()
    self.model.summary()

  def compile(self,learning_rate=0.0001):#모델 컴파일링
    optimizer=Adam(learning_rate=learning_rate)
    mse_loss=MeanSquaredError()
    self.model.compile(optimizer=optimizer, loss=self._calculate_combined_loss,#컴파일 과정에서 직접 만든 손실함수 사용
                       metrics=[self._calculate_reconstruction_loss,#사용 요소 정의
                                self._calculate_kl_loss])

  def train(self,x_train,batch_size,num_epochs):#학습, 분류가 아니라서 유효 셋이 없음
    self.model.fit(x_train,#입력값
                   x_train,#출력 예상값, 원래 값과 비슷하게 나오게 유도했으므로 비슷한 값이 나옴
                   batch_size=batch_size,
                   epochs=num_epochs,
                   shuffle=True)

  def save(self, save_folder="."):
    self._create_folder_if_it_doesnt_exist(save_folder)
    self._save_parameters(save_folder)
    self._save_weights(save_folder)

  def load_weights(self,weights_path):
    self.model.load_weights(weights_path)

  def reconstruct(self,images):
    latent_representations=self.encoder.predict(images)#인코더로 예측시작, 특징공간 추출
    reconstructed_images=self.decoder.predict(latent_representations)#해당 특징공간을 활용해서 생성함
    return reconstructed_images,latent_representations#특징공간에서 나타난 결괏값 보여줌

  @classmethod
  def load(cls,save_folder="."):#저장된 파일이 있을 시, 패러미터 파일, 가중치 파일에서 읽어옴
    parameters_path=os.path.join(save_folder,"parameters.pkl")
    with open(parameters_path,"rb") as f:
      parameters=pickle.load(f)
    autoencoder=VAE(*parameters)
    weight_path=os.path.join(save_folder,"weights.h5")
    autoencoder.load_weights(weights_path)
    return autoencoder

  def _calculate_combined_loss(self,y_target,y_predicted):#각 오류값들 총합
    reconstruction_loss=self._calculate_reconstruction_loss(y_target,y_predicted) #새로 생성된 샘플과의 오차
    kl_loss=self._calculate_kl_loss(y_target,y_predicted) #새로 생성된 샘플과의 오차 KL 다이버전스
    combined_loss=self.reconstruction_loss_weight*reconstruction_loss+kl_loss #총합
    return reconstruction_loss

  def _calculate_reconstruction_loss(self,y_target,y_predicted):#새로 생성된 값들의 평균 오차값 계산
    error=y_target-y_predicted
    reconstruction_loss=K.mean(K.square(error),axis=[1,2,3])
    return reconstruction_loss

  def _calculate_kl_loss(self,y_target,y_predicted):#새로 생성된 값들의 kl 다이버전스 오류값 계산
    kl_loss=-0.5*K.sum(1+self.log_varience-K.square(self.mu)-K.exp(self.log_varience),axis=1)
    #(1/2)* 총합(1+ 로그공간(분산)-평균^2-분산)
    return kl_loss

  def _create_folder_if_it_doesnt_exist(self,folder): #파일 없으면 생성
    if not os.path.exists(folder):
      os.makedirs(folder)

  def _save_parameters(self, save_folder): #저장할 패러미터 지정하고 생성
    parameters = [
            self.input_shape,
            self.conv_filters,
            self.conv_kernels,
            self.conv_strides,
            self.latent_space_dim
        ]
    save_path=os.path.join(save_folder,"parameters.pkl")
    with open(save_path,"wb") as f:
      pickle.dump(parameters,f)

  def _save_weights(self,save_folder): #저장할 가중치 지정하고 생성
    save_path=os.path.join(save_folder,"weights.h5")
    self.model.save_weights(save_path)

  def _build(self):
    self._build_encoder()#인코더 생성
    self._build_decoder()
    self._build_autoencoder()

#오토인코더
  def _build_autoencoder(self):#오토인코더 생성, 인코더 + 디코더
    model_input = self._model_input
    model_output = self.decoder(self.encoder(model_input))
    self.model = Model(model_input, model_output, name="autoencoder")

#디코더
  def _build_decoder(self): #디코더 생성, 구성요소 넣음
    decoder_input=self._add_decoder_input() #디코더의 입력값
    dense_layer=self._add_dense_layer(decoder_input) # 디코더 내의 완전연결층, 생성을 위함
    reshape_layer=self._add_reshape_layer(dense_layer) #레이어 크기 재조정(?)
    conv_transpose_layers=self._add_conv_transpose_layers(reshape_layer)#전치행렬로 변경
    decoder_output=self._add_decoder_output(conv_transpose_layers)# 디코더 생성값 출력
    self.decoder=Model(decoder_input,decoder_output,name="decoder")#입력값, 출력값의 형태를 가지고 모델 생성

  def _add_decoder_input(self):#입력값, 특징 공간 크기 지정
    return Input(shape=self.latent_space_dim,name="decoder_input")#디코더 입력값으로 지정

  def _add_dense_layer(self,decoder_input):#생성층 선언
    num_neurons=np.prod(self._shape_before_bottleneck)#보틀넥의 연결갯수 지정
    dense_layer=Dense(num_neurons,name="decoder_dense")(decoder_input)#
    return dense_layer

  def _add_reshape_layer(self,dense_layer): #
    return Reshape(self._shape_before_bottleneck)(dense_layer)#텐서의 모양을 입력된 크기로 변경

  def _add_conv_transpose_layers(self,x):#들어온 모든 값에 대해서 전치행렬화를 시킴
    for layer_index in reversed(range(1,self._num_conv_layers)):
      x=self._add_conv_transpose_layer(layer_index,x)
    return x

  def _add_conv_transpose_layer(self,layer_index,x):#컨볼루션 레이어 역순으로 재지정
    layer_num=self._num_conv_layers-layer_index # 레이어 순서를 역순으로 재지정
    conv_transpose_layer=Conv2DTranspose(#컨볼루션 구조 지정, 역순으로 가져와서 순서대로 넣음
        filters=self.conv_filters[layer_index],
        kernel_size=self.conv_kernels[layer_index],
        strides=self.conv_strides[layer_index],
        padding="same",
        name=f"decoder_conv_transpose_layer{layer_num}"
    )
    x=conv_transpose_layer(x)#현재 레이어 역순으로 된 걸로 새로저장
    x=ReLU(name=f"decoder_relu_{layer_num}")(x)#현재 레이어 활성화함수
    x=BatchNormalization(name=f"decoder_bn_{layer_num}")(x)#현재 레이어 정규화
    return x

  def _add_decoder_output(self,x):
    conv_transpose_layer=Conv2DTranspose(#뒤집힌 컨볼루션 레이어 선언
        filters=1,
        kernel_size=self.conv_kernels[0],
        strides=self.conv_strides[0],
        padding="same",
        name=f"decoder_conv_transpose_layer_{self._num_conv_layers}"
    )
    x=conv_transpose_layer(x)#현재 레이어 저장
    output_layer=Activation("sigmoid",name="sigmoid_layer")(x)#활성화 함수 적용함
    return output_layer#출력

#인코더
  def _build_encoder(self):#인코더 내부 채우기
    encoder_input = self._add_encoder_input()#입력값 변환
    conv_layers = self._add_conv_layers(encoder_input) #컨볼루션 레이어에 적용
    bottleneck = self._add_bottleneck(conv_layers) # 보틀넥 적용
    self._model_input = encoder_input #인코더 적용 끝, 모델로 들어갈 최종 입력값
    self.encoder = Model(encoder_input, bottleneck, name="encoder")



  def _add_encoder_input(self):#인코더 내부의 입력칸 생성
    return Input(shape=self.input_shape,name="encoder_input")#입력값 받아서 넣어줌

  def _add_conv_layers(self,encoder_input):#

    x=encoder_input#인코더 입력창
    for layer_index in range(self._num_conv_layers):
      x=self._add_conv_layer(layer_index,x)#레이어 갯수만큼 시행
    return x

  def _add_conv_layer(self,layer_index,x):#x번째, 컨볼루션 레이어 추가

    layer_number=layer_index+1#마지막 relu를 위한 레이어 추가로 넣음
    conv_layer=Conv2D( #컨볼루션 선언
        filters=self.conv_filters[layer_index],#필터 갯수
        kernel_size=self.conv_kernels[layer_index],#커널(필터) 크기
        strides=self.conv_strides[layer_index],#몇 칸씩 이동할건지
        padding="same",#동일 크기 유지
        name=f"encoder_conv_layer_{layer_number}"#이름 지정
    )
    x=conv_layer(x)#컨볼루션의 결괏값(?)
    x=ReLU(name=f"encoder_relu_{layer_number}")(x)#결괏값에 relu적용
    x=BatchNormalization(name=f"encoder_bn_{layer_number}")(x)# relu적용값에 정규화 적용
    return x #결과값 반화
  """
  def sample_point_from_normal_distribution(args):
    mu,log_variance=args
    epsilon=K.random_normal(shape=K.shape(self.mu),mean=0.,stddev=1.)
    sampled_point=mu.K.exp(log_variance/2)*epsilon
    return sampled_point

  def _add_bottleneck(self,x):#Dense레이어 보틀넥 생성
    self._shape_before_bottleneck=K.int_shape(x)[1:]#backend 임포트한 것 적용, 평평하기 이전 데이터들 저장
    x=Flatten()(x) #차원 축소
    self.mu=Dense(self.latent_space_dim,name="mut")(x)
    self.log_variance = Dense(self.latent_space_dim,
                                  name="log_variance")(x)
    x=Lambda(sample_point_from_normal_distribution,name="encoder_output")([self.mu,self.log_variance])
    return x
  """
  def _add_bottleneck(self, x):
        """Flatten data and add bottleneck with Guassian sampling (Dense
        layer).
        """
        self._shape_before_bottleneck = K.int_shape(x)[1:]
        x = Flatten()(x)
        self.mu = Dense(self.latent_space_dim, name="mu")(x) #주어진 데이터들의 특징벡터 평균값
        self.log_variance = Dense(self.latent_space_dim,
                                  name="log_variance")(x)# 주어진 데이터들의 특징벡터 로그분산

        def sample_point_from_normal_distribution(args): #중간에 함수 선언, 정규 분포 샘플링
            mu, log_variance = args
            epsilon = K.random_normal(shape=K.shape(self.mu), mean=0.,
                                      stddev=1.)
            sampled_point = mu + K.exp(log_variance / 2) * epsilon
            return sampled_point

        x = Lambda(sample_point_from_normal_distribution,
                   name="encoder_output")([self.mu, self.log_variance])
        return x




if __name__=="__main__":
  autoencoder=VAE(
        input_shape=(28,28,1), #입력값
        conv_filters=(32, 64, 64, 64),
        conv_kernels=(3,3,3,3),
        conv_strides=(1,2,2,1),
        latent_space_dim=2
    )
  autoencoder.summary()


Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_input (InputLayer)  [(None, 28, 28, 1)]          0         []                            
                                                                                                  
 encoder_conv_layer_1 (Conv  (None, 28, 28, 32)           320       ['encoder_input[0][0]']       
 2D)                                                                                              
                                                                                                  
 encoder_relu_1 (ReLU)       (None, 28, 28, 32)           0         ['encoder_conv_layer_1[0][0]']
                                                                                                  
 encoder_bn_1 (BatchNormali  (None, 28, 28, 32)           128       ['encoder_relu_1[0][0]']