# Speech spectrogram classification

## Spectrogram 데이터를 이용해 음성을 분류하는 모델을 제작해보자.

## challenge
### Task
* 1초 길이의 오디오 음성데이터를 이용해 단어를 분류하는 것이 목표입니다.
* 주어진 데이터를 이용해 딥러닝 트레이닝 과정을 구현해 보는것이 목표입니다.
* This code is borrowed from [Kaggle/TensorFlow Speech Recognition Challenge](https://www.kaggle.com/c/tensorflow-speech-recognition-challenge).
* This is version 0.01 of the data set containing 64,727 audio files, released on August 3rd 2017.
* **챌린지에서 사용하는 데이터는 Wave에서 Spectrogram으로 변환된 데이터입니다.**

### Import packages

* 우리가 사용할 packages 를 import 하는 부분 입니다.
* 필요에 따른 packages를 선언합니다.

In [1]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import BatchNormalization, ReLU, LeakyReLU, Conv2D,GlobalAveragePooling2D, Dropout, Dense
import numpy as np
import pandas as pd


from sklearn.model_selection import train_test_split

# import librosa
# import librosa.display
import matplotlib.pyplot as plt

import os
from os.path import isdir, join

import random
import copy
import sys

tf.__version__

'2.10.0'

### Setting Dataset

* Colab 적용을 위한 변수 지정 및 드라이브 마운트

In [2]:
use_colab = True
assert use_colab in [True, False]

In [3]:
# from google.colab import drive
# drive.mount('/content/drive')

In [4]:
if use_colab:
    DATASET_PATH = "./drive/"
else:
    DATASET_PATH = "./drive/"

### Dataset Shape
* 불러온 데이터셋의 shape을 확인해보자.

In [5]:
data_wav = np.load("./drive/speech_spec_8000.npy")
print(data_wav.shape)
# 50620, 130, 126, 1

(50620, 130, 126)


In [6]:
(-1, data_wav.shape[1], data_wav.shape[2], 1)

(-1, 130, 126, 1)

* Spectrogram으로 변환한 데이터를 plot 해본다.

In [7]:
# librosa.display.specshow(librosa.amplitude_to_db(data_wav[219], ref=np.max), x_axis='time')
# plt.title('Power spectrogram')
# plt.colorbar(format='%+2.0f dB')
# plt.tight_layout()
# plt.show()

* 전체 데이터셋의 wave 데이터를 spectrogram으로 변환한다.

### Target_list 설정
* 데이터셋은 기본적으로 총 12개의 클래스로 나누어져있다.
```
['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go', 'unknown', 'silence']
```
* 해당 클래스로 나누어진 label을 학습 가능한 형태로 처리 후 데이터셋 제작

In [8]:
data_label = np.load("./drive/speech_label_8000.npy")
print(data_label.shape)

(50620, 1)


In [9]:
train_wav, test_wav, train_label, test_label = train_test_split(data_wav, data_label, test_size = 0.1, shuffle =True)

# train_wav, test_wav, train_label, test_label = train_test_split(speech_data["wav_vals"], # wav 파일들의 데이터
#                                                                 speech_data["label_vals"], # label 파일들의 데이터
#                                                                 test_size=0.1, # 비율 train, test를 몇퍼센트의 비율로 나눌지
#                                                                 shuffle=True) # 섞을 것인지?
#                                                                 #(파일, 정답) 이 형태로 섞어주게됩니다.


# reshape for conv layers Conv2D -> 차원이 더 늘어납니다. 데이터 shape도 바뀝니다!
train_wav = train_wav.reshape([-1,130,126,1])
test_wav = test_wav.reshape([-1,130,126,1])

print(train_wav.shape)
print(test_wav.shape)
print(train_label.shape)
print(test_label.shape)

del data_wav # 메모리 관리를 위해 변수 삭제
del data_label

(45558, 130, 126, 1)
(5062, 130, 126, 1)
(45558, 1)
(5062, 1)


In [10]:
# label 전처리
target_list = ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go', 'unknown', 'silence']
##################################################
##### 주어진 label => idx 형태로 변경해주셔야합니다. #####
##################################################


new_label_value = dict() # 사전에 입력
for i, l in enumerate(target_list):
    new_label_value[l] = i
label_value = new_label_value # 일종의 번역사전을 만들게 됩니다.

In [11]:
label_value

{'yes': 0,
 'no': 1,
 'up': 2,
 'down': 3,
 'left': 4,
 'right': 5,
 'on': 6,
 'off': 7,
 'stop': 8,
 'go': 9,
 'unknown': 10,
 'silence': 11}

In [12]:
label_value['yes']
test_label

array([['off'],
       ['up'],
       ['go'],
       ...,
       ['down'],
       ['yes'],
       ['on']], dtype='<U7')

In [13]:
temp = []
for v in train_label:
    #print(label_value[v[0]]) # v[0] => 리스트의 첫번째 값
    temp.append(label_value[v[0]]) # ["down"] => "down"
train_label = np.array(temp)

temp = []
for v in test_label:
    temp.append(label_value[v[0]])
test_label = np.array(temp)

del temp
type(train_label[0])

numpy.int32

In [14]:
train_label[0:10]

array([ 6,  8,  1,  5,  6, 10,  7,  4,  4,  7])

### Model dataset setting
* 변환된 데이터를 이용해서 학습에 활용할 데이터셋을 설정한다.
    * data -> data_wav
    * label -> data_label

In [15]:
def one_hot_label(wav, label):
    label = tf.one_hot(label, depth=12)
    return wav, label

In [16]:
print('Train_Wav Demension : ' + str(np.shape(train_wav)))
print('Train_Label Demension : ' + str(np.shape(train_label)))
print('Test_Wav Demension : ' + str(np.shape(test_wav)))
print('Test_Label Demension : ' + str(np.shape(test_label)))
print('Number Of Labels : ' + str(len(label_value)))

Train_Wav Demension : (45558, 130, 126, 1)
Train_Label Demension : (45558,)
Test_Wav Demension : (5062, 130, 126, 1)
Test_Label Demension : (5062,)
Number Of Labels : 12


### Hyper-parameters setting
* 학습 전반에서 사용할 batch size, epoch, checkpoint dir을 설정한다.

In [17]:
# the save point
if use_colab:
    checkpoint_dir ='./drive/MyDrive/train_ckpt/spectrogram/exp1'
    if not os.path.isdir(checkpoint_dir):
        os.makedirs(checkpoint_dir)
else:
    checkpoint_dir = 'spectrogram/exp1'

### Dataset 구성
* 전처리가 완료된 데이터들을 이용해서 Train, Test Dataset을 직접 구성해봅시다.
* 학습에 사용할 Loss Function의 설정을 고려해 제작


In [18]:
# 전체 데이터셋 구성
batch_size = 32

# for train
train_dataset = tf.data.Dataset.from_tensor_slices((train_wav, train_label))
train_dataset = train_dataset.map(one_hot_label)
train_dataset = train_dataset.shuffle(len(train_wav)).repeat().batch(batch_size=batch_size)
print(train_dataset)

<BatchDataset element_spec=(TensorSpec(shape=(None, 130, 126, 1), dtype=tf.float32, name=None), TensorSpec(shape=(None, 12), dtype=tf.float32, name=None))>


In [19]:
# for test

test_dataset = tf.data.Dataset.from_tensor_slices((test_wav, test_label))
test_dataset = test_dataset.map(one_hot_label)
test_dataset = test_dataset.batch(batch_size=batch_size) # 테스트때는 섞을 필요가 없다!
print(test_dataset)

<BatchDataset element_spec=(TensorSpec(shape=(None, 130, 126, 1), dtype=tf.float32, name=None), TensorSpec(shape=(None, 12), dtype=tf.float32, name=None))>


### Dataset 구성 검증
```
<BatchDataset shapes: ((None, 130, 126, 1), (None, 12)), types: (tf.float32, tf.float32)>
<BatchDataset shapes: ((None, 130, 126, 1), (None, 12)), types: (tf.float32, tf.float32)>
```

## Model 구현
* 제시된 모델을 구현해보고, 더 좋은 성능으로 튜닝해보자.

    * inputs = [batch_size, 130, 126, 1]
    * conv1 = [batch_size, 65, 63, 16]
    * conv2 = [batch_size, 33, 32, 32]
    * conv3 = [batch_size, 17, 16, 64]
    * desne = [batch_size, 64]
    * output = [batch_size, 12]






In [20]:
# input_tensor = layers.Input(shape=(130,126, 1,))

# x = # TODO
# print(x.shape)

# output_tensor =layers.Dense(12)

# model = tf.keras.Model(# TODO)

In [21]:

# 클래스버젼 - Seperable Conv2D
# class ResBlock(tf.keras.Model):
#     def __init__(self, num_filter, stride=1, kernel_size=3, l2_reg=1e-4):
#         super(ResBlock, self).__init__()
#         self.num_filter = num_filter
#         self.conv1 = layers.Conv2D(num_filter, kernel_size, strides=stride, padding='same',
#                                    kernel_initializer=tf.keras.initializers.he_normal(),)
#                                   #  kernel_regularizer=regularizers.l2(0.001))
#         self.se_conv1 = layers.SeparableConv2D(num_filter, kernel_size, padding='same',
#                                                depthwise_initializer=tf.keras.initializers.he_normal(),
#                                                pointwise_initializer=tf.keras.initializers.he_normal(),)
#                                               #  depthwise_regularizer=regularizers.l2(0.001),
#                                               #  pointwise_regularizer=regularizers.l2(0.001))
#         self.se_sonv2 = layers.SeparableConv2D(num_filter, kernel_size, padding='same',
#                                                depthwise_initializer=tf.keras.initializers.he_normal(),
#                                                pointwise_initializer=tf.keras.initializers.he_normal(),)
#                                               #  depthwise_regularizer=regularizers.l2(0.001),
#                                               #  pointwise_regularizer=regularizers.l2(0.001))
#         self.con_short = Conv2D(num_filter, 1, strides=2, padding='same')
#         self.bn1 = BatchNormalization()
#         self.bn2 = BatchNormalization()
#         self.bn3 = BatchNormalization()
#         self.bn_short = BatchNormalization()
# 
#     def call(self, input, stride=1):
#         shortcut = input
#         x = self.conv1(input)
#         x = self.bn1(x)
#         x = layers.Activation('relu')(x)
#         x = self.se_conv1(x)
#         x = self.bn2(x)
#         x = self.se_sonv2(x)
#         x = self.bn3(x)
#         # 입력과 출력의 맞도록 숏컷을 조절
#         if shortcut.shape[-1] != self.num_filter or stride != 1:
#             shortcut = self.con_short(shortcut)
#             shortcut = self.bn_short(shortcut)
# 
#         x = layers.Add()([x, shortcut])
#         x = layers.Activation('relu')(x)
#         return x
# 
# 
# class ResNet28(tf.keras.Model):
#     def __init__(self):
#         super(ResNet28, self).__init__()
#         self.conv1 = layers.Conv2D(64, 7, strides=2,
#                                    kernel_initializer=tf.keras.initializers.he_normal(),
#                                    padding='same')
#         self.bn1 = BatchNormalization()
#         self.bn2 = BatchNormalization()
#         self.relu1 = ReLU()
#         self.relu2 = ReLU()
# 
#         self.blocks = [
#                           ResBlock(64, stride=1) ] + [
#                           ResBlock(128, stride=2),
#                           ResBlock(256, stride=2)] + [
#                           ResBlock(256, stride=1) ] + [
#                           ResBlock(512, stride=2),
#                           ResBlock(512, stride=1)]
# 
#         #         self.blocks = [
#         #                           ResBlock(64, stride=1) for _ in range(2)] + [
#         #                           ResBlock(128, stride=2)] + [
#         #                           ResBlock(128, stride=1) for _ in range(2)] + [
#         #                           ResBlock(256, stride=2)] + [
#         #                           ResBlock(256, stride=1) for _ in range(2)] + [
#         #                           ResBlock(512, stride=2)] + [
#         #                           ResBlock(512, stride=1)]
# 
#         self.avg_pool = GlobalAveragePooling2D()
# 
#         self.dropout = Dropout(0.5)
#         self.out = Dense(12)
# 
#     def call(self, inputs):
#         x = self.conv1(inputs)
#         x = self.bn1(x)
#         x = self.relu1(x)
# 
#         for block in self.blocks:
#             x = block(x)
# 
#         x = self.avg_pool(x)
#         x = layers.Dense(1000)(x)
#         x = self.bn2(x)
#         x = self.relu2(x)
#         x = self.dropout(x)
#         x = self.out(x)
# 
#         return x
# 
# 
# # Specify input shape and number of classes
# input_tensor = layers.Input(shape=(130, 126, 1,))  # Example input shape for image classification
# 
# # Build ResNet model
# Res = ResNet28()
# output_tensor = Res.call(input_tensor)
# model = tf.keras.Model(inputs=input_tensor, outputs=output_tensor)
# 
# # Display model summary
# model.summary()

In [22]:

# 클래스버젼 - Seperable Conv2D
class ResBlock(tf.keras.Model):
    def __init__(self, num_filter, stride=1, kernel_size=3, l2_reg=1e-4):
        super(ResBlock, self).__init__()
        self.num_filter = num_filter
        self.conv1 = layers.Conv2D(num_filter, kernel_size, strides=stride, padding='same',
                                   kernel_initializer=tf.keras.initializers.he_normal(),)
        self.se_conv1 = layers.SeparableConv2D(num_filter, kernel_size, padding='same',
                                               depthwise_initializer=tf.keras.initializers.he_normal(),
                                               pointwise_initializer=tf.keras.initializers.he_normal(),)
        self.se_sonv2 = layers.SeparableConv2D(num_filter, kernel_size, padding='same',
                                               depthwise_initializer=tf.keras.initializers.he_normal(),
                                               pointwise_initializer=tf.keras.initializers.he_normal(),)
        self.se_conv3 = layers.SeparableConv2D(num_filter, 5, padding='same',
                                               depthwise_initializer=tf.keras.initializers.he_normal(),
                                               pointwise_initializer=tf.keras.initializers.he_normal(),)
        self.se_conv4 = layers.SeparableConv2D(num_filter, 5, padding='same',
                                               depthwise_initializer=tf.keras.initializers.he_normal(),
                                               pointwise_initializer=tf.keras.initializers.he_normal(),)
        self.con_short = Conv2D(num_filter, 1, strides=2, padding='same')
        self.bn1 = BatchNormalization()
        self.bn2 = BatchNormalization()
        self.bn3 = BatchNormalization()
        self.bn4 = BatchNormalization()
        self.bn5 = BatchNormalization()
        self.bn_short = BatchNormalization()

    def call(self, input, stride=1):
        shortcut = input 
        x = self.conv1(input)
        x = self.bn1(x)
        x = layers.Activation('relu')(x)
        x2 = x
        x = self.se_conv1(x)
        x = self.bn2(x)
        x = self.se_sonv2(x)
        x = self.bn3(x)
        x2 = self.se_conv3(x2)
        x2 = self.bn4(x2)
        x2 = self.se_conv4(x2)
        x2 = self.bn5(x2)

        if shortcut.shape[-1] != self.num_filter or stride != 1:
            shortcut = self.con_short(shortcut)
            shortcut = self.bn_short(shortcut)

        x = layers.Add()([x,x2, shortcut])
        x = layers.Activation('relu')(x)
        return x


class ResNet28(tf.keras.Model):
    def __init__(self):
        super(ResNet28, self).__init__()
        self.conv1 = layers.Conv2D(64, 7, strides=2,
                                   kernel_initializer=tf.keras.initializers.he_normal(),
                                   padding='same')
        self.bn1 = BatchNormalization()
        self.bn2 = BatchNormalization()
        self.relu1 = ReLU()
        self.relu2 = ReLU()

        self.blocks = [
                          ResBlock(64, stride=1) ] + [
                          ResBlock(128, stride=2),
                          ResBlock(256, stride=2)] + [
                          ResBlock(256, stride=1)] + [
                          ResBlock(512, stride=2),
                          ResBlock(512, stride=1)]

        self.avg_pool = GlobalAveragePooling2D()

        self.dropout = Dropout(0.5)
        self.out = Dense(12)

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.bn1(x)
        x = self.relu1(x)

        for block in self.blocks:
            x = block(x)

        x = self.avg_pool(x)
        x = layers.Dense(1000)(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.dropout(x)
        x = self.out(x)

        return x


# Specify input shape and number of classes
input_tensor = layers.Input(shape=(130, 126, 1,))  # Example input shape for image classification

# Build ResNet model
Res = ResNet28()
output_tensor = Res.call(input_tensor)
model = tf.keras.Model(inputs=input_tensor, outputs=output_tensor)

# Display model summary
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 130, 126, 1)]     0         
                                                                 
 conv2d (Conv2D)             (None, 65, 63, 64)        3200      
                                                                 
 batch_normalization (BatchN  (None, 65, 63, 64)       256       
 ormalization)                                                   
                                                                 
 re_lu (ReLU)                (None, 65, 63, 64)        0         
                                                                 
 res_block (ResBlock)        (None, 65, 63, 64)        59200     
                                                                 
 res_block_1 (ResBlock)      (None, 33, 32, 128)       160000    
                                                             

* 구현된 모델을 어떻게 학습시킬 것인지 구성해봅시다.

In [23]:
initial_learning_rate = 1e-4

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=initial_learning_rate),
              # 0.0001~0.000001
              loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

### 모델 Output 확인
* 총 12개의 예측 데이터가 출력되는지 확인해봅시다.

In [24]:
# without training, just inference a model:
predictions = model(train_wav[0:1], training=False)
print("Predictions: ", predictions.numpy())

Predictions:  [[ 0.01871599  0.05424325  0.0402809  -0.02300485  0.06109969 -0.03999165
  -0.00589754  0.00385274 -0.03493483  0.01151454 -0.02498282  0.0178601 ]]


* 최종 모델 구성을 확인 후 모델을 저장할 체크포인트를 구성해봅시다.

In [25]:
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 130, 126, 1)]     0         
                                                                 
 conv2d (Conv2D)             (None, 65, 63, 64)        3200      
                                                                 
 batch_normalization (BatchN  (None, 65, 63, 64)       256       
 ormalization)                                                   
                                                                 
 re_lu (ReLU)                (None, 65, 63, 64)        0         
                                                                 
 res_block (ResBlock)        (None, 65, 63, 64)        59200     
                                                                 
 res_block_1 (ResBlock)      (None, 33, 32, 128)       160000    
                                                             

In [26]:
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_dir,
                                                 save_weights_only=True,
                                                 monitor='val_loss',
                                                 mode='auto',
                                                 save_best_only=True,
                                                 verbose=1)

early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=20,
                                                     monitor='val_loss',
                                                     restore_best_weights=True,

                                                     verbose=1)
from tensorflow.keras.callbacks import ReduceLROnPlateau
reduce_lr = ReduceLROnPlateau(monitor='val_loss',  # 관찰할 지표
                              factor=0.2,  # 학습률을 줄이는 비율
                              patience=4,  # 몇 번의 에포크 동안 감소하지 않아야 하는지
                              min_lr=1e-9)

## Training
* 위에서 구현한 데이터셋, 모델들을 fit 함수를 이용해 학습을 시켜봅시다.

In [None]:
# model.fit model.fit_generator는 model.fit으로 통일되었습니다.
# tf.data.Dataset은 generator 입니다.
history = model.fit(train_dataset,
                    steps_per_epoch= len(train_wav)// batch_size,
                    epochs=50,
                    callbacks=[cp_callback,early_stopping_cb,reduce_lr],
                    validation_data= test_dataset,
                    validation_steps=len(test_wav) // batch_size,)

Epoch 1/50
Epoch 1: val_loss improved from inf to 0.79866, saving model to ./drive/MyDrive/train_ckpt/spectrogram\exp1
Epoch 2/50
Epoch 2: val_loss improved from 0.79866 to 0.27260, saving model to ./drive/MyDrive/train_ckpt/spectrogram\exp1
Epoch 3/50
Epoch 3: val_loss did not improve from 0.27260
Epoch 4/50
Epoch 4: val_loss improved from 0.27260 to 0.24942, saving model to ./drive/MyDrive/train_ckpt/spectrogram\exp1
Epoch 5/50
Epoch 5: val_loss improved from 0.24942 to 0.24672, saving model to ./drive/MyDrive/train_ckpt/spectrogram\exp1
Epoch 6/50
Epoch 6: val_loss improved from 0.24672 to 0.13940, saving model to ./drive/MyDrive/train_ckpt/spectrogram\exp1
Epoch 7/50
Epoch 7: val_loss did not improve from 0.13940
Epoch 8/50
Epoch 8: val_loss did not improve from 0.13940
Epoch 9/50
Epoch 9: val_loss did not improve from 0.13940
Epoch 10/50

* 학습 결과 확인

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss=history.history['loss']
val_loss=history.history['val_loss']

epochs_range = range(len(acc))

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

## Evaluation
* Test dataset을 이용해서 모델의 성능을 평가합니다.

In [None]:
model.load_weights(checkpoint_dir)

In [None]:
results = model.evaluate(test_dataset)

### 스코어 결과
* 위의 스코어는 분류모델에 적용되는 스코어입니다.
* 모델의 크기 (MB) 와 정확도를 이용해 스코어를 출력합니다.

In [None]:
def final_score():
    print("Model params num : " + str(model.count_params()))
    print("Accuracy : " + str(results[1]))

    s = (model.count_params() * 32) / (1024 ** 2)
    score = 50 * (results[1] + min((1/s), 1))

    print("score : " + str(score))

In [None]:
final_score()