# prj :  Spectrogram classification 모델 구현

- 2차원 데이터 받아 
- 기본버전과 skip connection 버전 모델 실습


## import library

In [7]:
import numpy as np
import os
import matplotlib.pyplot as plt

import IPython.display as ipd
import random

from sklearn.model_selection import train_test_split

import tensorflow as tf

from tensorflow.keras import layers

import librosa

In [8]:
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

## data load

In [21]:
data_path = os.getenv("HOME")+'/aiffel/AIFFEL_LSG/utill/speech_wav_8000.npz'
speech_data = np.load(data_path)

In [22]:
print("Wave data shape : ", speech_data["wav_vals"].shape)
print("Label data shape : ", speech_data["label_vals"].shape)


Wave data shape :  (50620, 8000)
Label data shape :  (50620, 1)


## 데이터 처리와 분류

###   2차원 Spectrogram 변형`

In [23]:
def wav2spec(wav, fft_size=258): # spectrogram shape을 맞추기위해서 size 변형
    D = np.abs(librosa.stft(wav, n_fft=fft_size))
    return D

In [24]:
speech_data["wav_vals"].shape

(50620, 8000)

In [38]:


spec_data = []
for idx, data in enumerate(speech_data["wav_vals"]):
    if idx == 30000: break
    spec = wav2spec(data)
    spec_data.append(spec)

spec_data = np.array(spec_data)




In [39]:
type(spec_data)

numpy.ndarray

In [40]:
print("Wave data shape : ", spec_data.shape)


Wave data shape :  (30000, 130, 126)


### label data 처리

In [41]:
target_list = ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go']

label_value = target_list
label_value.append('unknown')
label_value.append('silence')

new_label_value = dict()
for i, l in enumerate(label_value):
    new_label_value[l] = i
label_value = new_label_value

In [42]:
temp = []

for v in speech_data["label_vals"]:
    temp.append(label_value[v[0]])
label_data = np.array(temp)


In [43]:
label_value

{'yes': 0,
 'no': 1,
 'up': 2,
 'down': 3,
 'left': 4,
 'right': 5,
 'on': 6,
 'off': 7,
 'stop': 8,
 'go': 9,
 'unknown': 10,
 'silence': 11}

### data set 분리

In [45]:
from sklearn.model_selection import train_test_split

train_wav, test_wav, train_label, test_label = train_test_split(spec_data, 
                                                                label_data[:30000], 
                                                                test_size=0.1,
                                                                shuffle=True)
#print(train_wav)



In [46]:
# train_wav = train_wav.reshape([-1, sr, 1]) # add channel for CNN
# test_wav = test_wav.reshape([-1, sr, 1])
# print("✅")

In [47]:
print("train data : ", train_wav.shape)
print("train labels : ", train_label.shape)
print("test data : ", test_wav.shape)
print("test labels : ", test_label.shape)
print("✅")

train data :  (27000, 130, 126)
train labels :  (27000,)
test data :  (3000, 130, 126)
test labels :  (3000,)
✅


## 학습 하이퍼 파리미터 설정

In [60]:
batch_size = 12
max_epochs = 10

# the save point
checkpoint_dir = os.getenv('HOME')+'/aiffel/AIFFEL_LSG/utill/speech_recognition/checkpoint/wav-spec'

checkpoint_dir

'/home/aiffel0042/aiffel/AIFFEL_LSG/utill/speech_recognition/checkpoint/wav-spec'

## dataset 구성

In [61]:
def one_hot_label(wav, label):
    label = tf.one_hot(label, depth=12)
    return wav, label


In [62]:
# for train
train_dataset = tf.data.Dataset.from_tensor_slices((train_wav, train_label))
train_dataset = train_dataset.map(one_hot_label)
train_dataset = train_dataset.repeat().batch(batch_size=batch_size)
print(train_dataset)

# for test
test_dataset = tf.data.Dataset.from_tensor_slices((test_wav, test_label))
test_dataset = test_dataset.map(one_hot_label)
test_dataset = test_dataset.batch(batch_size=batch_size)
print(test_dataset)


<BatchDataset shapes: ((None, 130, 126), (None, 12)), types: (tf.float32, tf.float32)>
<BatchDataset shapes: ((None, 130, 126), (None, 12)), types: (tf.float32, tf.float32)>


##  model 

- 2차원 Spectrogram 데이터의 시간축 방향으로 Conv1D layer를 적용, 혹은 Conv2D layer를 적용 가능
- batchnorm, dropout, dense layer 등을 이용
- 12개의 단어 class를 구분하는 loss를 사용하고 Adam optimizer를 사용
- 모델 가중치를 저장하는 checkpoint callback 함수 추가


### 일반 모델

In [24]:
# from tensorflow.keras import layers

# input_tensor = layers.Input(shape=(130, 126, 1)) ## TODO

# x = layers.Conv1D(32, 9, padding='same', activation='relu')(input_tensor)
# x = layers.Conv1D(32, 9, padding='same', activation='relu')(x)
# x = layers.MaxPool1D()(x)

# x = layers.Conv1D(64, 9, padding='same', activation='relu')(x)
# x = layers.Conv1D(64, 9, padding='same', activation='relu')(x)
# x = layers.MaxPool1D()(x)

# x = layers.Conv1D(128, 9, padding='same', activation='relu')(x)
# x = layers.Conv1D(128, 9, padding='same', activation='relu')(x)
# x = layers.Conv1D(128, 9, padding='same', activation='relu')(x)
# x = layers.MaxPool1D()(x)

# x = layers.Conv1D(256, 9, padding='same', activation='relu')(x)
# x = layers.Conv1D(256, 9, padding='same', activation='relu')(x)
# x = layers.Conv1D(256, 9, padding='same', activation='relu')(x)
# x = layers.MaxPool1D()(x)
# x = layers.Dropout(0.3)(x)

# x = layers.Flatten()(x)
# x = layers.Dense(256)(x)
# x = layers.BatchNormalization()(x)
# x = layers.Activation('relu')(x)

# output_tensor = layers.Dense(12)(x)

# model_wav = tf.keras.Model(input_tensor, output_tensor)

# model_wav.summary()

In [18]:
input_tensor = layers.Input(shape=(130, 126, 1)) ## TODO

x = layers.Conv2D(32, 9, padding='same', activation='relu')(input_tensor)
x = layers.Conv2D(32, 9, padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(64, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(64, 9, padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(128, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(128, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(128, 9, padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(256, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(256, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(256, 9, padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Flatten()(x)
x = layers.Dense(256)(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)

output_tensor = layers.Dense(12)(x)

model_wav = tf.keras.Model(input_tensor, output_tensor)

model_wav.summary()

Model: "functional_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 130, 126, 1)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 130, 126, 32)      2624      
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 130, 126, 32)      82976     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 65, 63, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 65, 63, 64)        165952    
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 65, 63, 64)        331840    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 32, 31, 64)       

In [21]:
##### loss #######
optimizer=tf.keras.optimizers.Adam(1e-4)
model_wav.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
             optimizer=optimizer,
             metrics=['accuracy'])
print("✅")

✅


In [None]:
## train

# check point 저장
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_dir,
                                                 save_weights_only=True,
                                                 monitor='val_loss',
                                                 mode='auto',
                                                 save_best_only=True,
                                                  verbose=1)


#30분 내외 소요
history_wav = model_wav.fit(train_dataset, epochs=max_epochs,
                    steps_per_epoch=len(train_wav) // batch_size,
                    validation_data=test_dataset,
                    validation_steps=len(test_wav) // batch_size,
                    callbacks=[cp_callback]
)

## 학습 그래프 출력

In [None]:

import matplotlib.pyplot as plt

acc = history_wav.history['accuracy']
val_acc = history_wav.history['val_accuracy']

loss=history_wav.history['loss']
val_loss=history_wav.history['val_loss']

epochs_range = range(len(acc))

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()
print("✅")

## 성능평가

In [None]:

results = model_wav.evaluate(test_dataset)


In [None]:
# loss
print("loss value: {:.3f}".format(results[0]))
# accuracy
print("accuracy value: {:.4f}%".format(results[1]*100))


## conv2d 로변경

In [None]:
input_tensor = layers.Input(shape=(130, 126, 1)) ## TODO

x = layers.Conv2D(32, 9, padding='same', activation='relu')(input_tensor)
x = layers.Conv2D(32, 9, padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(64, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(64, 9, padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(128, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(128, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(128, 9, padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(256, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(256, 9, padding='same', activation='relu')(x)
x = layers.Conv2D(256, 9, padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Flatten()(x)
x = layers.Dense(256)(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)

output_tensor = layers.Dense(12)(x)

model_wav = tf.keras.Model(input_tensor, output_tensor)

model_wav.summary()

In [None]:
batch_size = 10
max_epochs = 5

# the save point
checkpoint_dir = os.getenv('HOME')+'/aiffel/AIFFEL_LSG/utill/speech_recognition/checkpoint/wav-spec'

checkpoint_dir

In [None]:
optimizer=tf.keras.optimizers.Adam(1e-4)
model_wav.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
             optimizer=optimizer,
             metrics=['accuracy'])


In [None]:

cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_dir,
                                                 save_weights_only=True,
                                                 monitor='val_loss',
                                                 mode='auto',
                                                 save_best_only=True,
                                                  verbose=1)



In [None]:

#30분 내외 소요
history_wav = model_wav.fit(train_dataset, epochs=max_epochs,
                    steps_per_epoch=len(train_wav) // batch_size,
                    validation_data=test_dataset,
                    validation_steps=len(test_wav) // batch_size,
                    callbacks=[cp_callback]
)