# 8차시: 텐서플로우 2.x 활용 Speech Recognition

## AI 맛보기 8주차: 2020. 08. 25. 20:00 ~ 22:00 (120분)
1. 도구 불러오기 및 버전 확인
1. 학습 데이터 다운로드 및 압축 풀기
1. 학습 데이터 살펴보기: 레이블 확인, 들어보기
1. 학습 데이터 전처리: 스펙트로그램
1. 학습 모델 준비: CNN
1. 학습
1. 학습 결과 테스트

#### 참고자료
- [파이썬 3 표준 문서](https://docs.python.org/3/index.html)
- [Audio Spectrogram](https://www.tensorflow.org/io/tutorials/audio#trim_the_noise)

### 1. 도구 불러오기 및 버전 확인

In [None]:
# 도구 준비
import os
# import shutil
import random
import math

import tensorflow as tf # 텐서플로우
import tensorflow_io as tfio
import matplotlib.pyplot as plt # 시각화 도구
%matplotlib inline
import numpy as np
from sklearn.model_selection import train_test_split

from IPython.display import Audio

In [None]:
print(f'Tensorflow 버전을 확인합니다: {tf.__version__}')

### 2. 학습 데이터 다운로드 및 압축 풀기

#### 데이터 다운로드
- [https://storage.cloud.google.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz](https://storage.cloud.google.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz)

In [None]:
!rm -r data_speech_commands
!mkdir data_speech_commands
!tar --directory data_speech_commands -xvf data_speech_commands_v0.02.tar.gz &> /dev/null

In [None]:
!rm -r data_speech_commands/_background_noise_

In [None]:
path_root = './data_speech_commands'

files = list()
labels = list()
lab2idx = list()
for l1 in os.scandir(path_root):
    if l1.is_file():
        continue
    label = os.path.basename(l1.path)
    if label not in lab2idx:
        lab2idx.append(label)
    label_idx = lab2idx.index(label)
    for l2 in os.scandir(l1.path):
        files.append(l2.path)
        labels.append([label_idx])
dataset_root = (files, labels)

In [None]:
x_train, x_test, y_train, y_test = train_test_split(files, labels, test_size=0.2, shuffle=True)

### 3. 학습 데이터 살펴보기: 차원, 레이블, 듣기

In [None]:
audio = tfio.audio.AudioIOTensor(x_test[0])
print(audio)
audio_tensor = tf.squeeze(audio.to_tensor(), axis=-1)
print(audio_tensor)

In [None]:
printed = set()
for data, label in zip(x_test, y_test):
    label = label[0]
    if label in printed:
        continue
    print(lab2idx[label], end=': ')
    audio = tfio.audio.AudioIOTensor(data)
    audio_tensor = tf.squeeze(audio.to_tensor(), axis=-1)
    display(Audio(audio_tensor.numpy(), rate=audio.rate.numpy()))
    printed.add(label)

### 4. 학습 데이터 전처리: 오디오 데이터, 스펙트로그램

In [None]:
volume = 1.0     # range [0.0, 1.0]
fs = 16000       # sampling rate, Hz, must be integer
duration = 1.0   # in seconds, may be float
f = 261.625      # sine frequency, Hz, may be float

samples = (np.sin(2*np.pi*np.arange(fs*duration)*f/fs)).astype(np.float32)
display(Audio(samples, rate=fs))

In [None]:
# Convert to spectrogram
spectrogram = get_spectrogram(samples)
spectrogram = tf.squeeze(spectrogram, -1)

fig = plt.figure()
ax = fig.add_subplot()
ax.axis('off')
_ = ax.imshow(tf.math.log(spectrogram.numpy()), cmap='gray')
fig = plt.figure()
ax = fig.add_subplot()
ax.set_ylim((-1*np.max(samples), np.max(samples)))
_ = ax.plot(samples)

In [None]:
train_dataset_root = tf.data.Dataset.from_tensor_slices((x_train, y_train))
val_dataset_root = tf.data.Dataset.from_tensor_slices((x_test, y_test))

In [None]:
tensor = tf.cast(audio_tensor, tf.float32) / 2**16

fig = plt.figure()
ax = fig.add_subplot()
ax.set_ylim((-1*np.max(tensor), np.max(tensor)))
_ = ax.plot(tensor)

In [None]:
def get_spectrogram(values):
    spectrogram = tfio.experimental.audio.spectrogram(tf.convert_to_tensor(values, dtype=tf.float32), 
                                                      nfft=512, window=512, stride=256)
    spectrogram = tf.transpose(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, -1)
    spectrogram = tf.image.flip_up_down(spectrogram)
    
    return spectrogram

In [None]:
def load_audio(path, label):
    audio = tfio.audio.AudioIOTensor(path, dtype='int16')
    audio_tensor = tf.squeeze(audio.to_tensor(), axis=[-1])
    audio_tensor = tf.pad(tf.expand_dims(audio_tensor, 0), ((0, 0), (0, 16000)), 
                          'constant', constant_values=0)
    audio_tensor = tf.slice(audio_tensor, (0, 0), (1, 16000))
    audio_tensor = tf.squeeze(audio_tensor, axis=0)
    tensor = tf.cast(audio_tensor, tf.float32) / 2**16
    
    spectrogram = get_spectrogram(tensor)
    return spectrogram, label

In [None]:
spectrogram = get_spectrogram(audio_tensor.numpy())
spectrogram = tf.squeeze(spectrogram, -1)

fig = plt.figure()
ax = fig.add_subplot()
ax.axis('off')
_ = ax.imshow(tf.math.log(spectrogram.numpy()), cmap='gray')

### 5. 학습 모델 준비: CNN

In [None]:
train_dataset = train_dataset_root.map(load_audio)
val_dataset = val_dataset_root.map(load_audio)

In [None]:
BUFFER_SIZE = 10000
BATCH_SIZE = 32
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
val_dataset = val_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=get_spectrogram(samples).shape),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(len(lab2idx))
])
model.summary()

In [None]:
history = model.compile(optimizer=tf.keras.optimizers.Adam(),
                        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

### 6. 학습

In [None]:
epochs = 10

history = model.fit(train_dataset,
                    epochs=epochs,
                    validation_data=val_dataset)

### 7. 학습 결과 테스트

In [None]:
history_dict = history.history
acc = history_dict['sparse_categorical_accuracy']
val_acc = history_dict['val_sparse_categorical_accuracy']
loss = history_dict['loss']
val_loss = history_dict['val_loss']

epochs = range(1, len(acc) + 1)

fig1 = plt.figure(figsize=(6, 10))
ax = fig1.add_subplot(2, 1, 1)
ax.plot(epochs, loss, 'bo', label='Training loss')
ax.plot(epochs, val_loss, 'b', label='Validation loss')
ax.set_ylim((0, math.ceil(max(max(loss), max(val_loss)))))
ax.set_title('Training and validation loss', fontsize=12)
ax.set_xlabel('Epochs', fontsize=10)
ax.set_ylabel('Loss', fontsize=10)
ax.legend()

ax = fig1.add_subplot(2, 1, 2)
ax.plot(epochs, acc, 'bo', label='Training acc')
ax.plot(epochs, val_acc, 'b', label='Validation acc')
ax.set_ylim((0, math.ceil(max(max(acc), max(val_acc)))))
ax.set_title('Training and validation accuracy', fontsize=12)
ax.set_xlabel('Epochs', fontsize=10)
ax.set_ylabel('Accuracy', fontsize=10)
ax.legend()

In [None]:
print(f'레이블 종류: ')
for n, label in enumerate(lab2idx, start=1):
    print(label, end=' ')
    if n % 10 == 0:
        print()
print()

In [None]:
test_label = 'happy'
test_idx = lab2idx.index(test_label)

target = list()
for path, label in zip(x_test, y_test):
    label = label[0]
    if label == test_idx:
        target.append((path, label))

choice = random.choice(target)
test_path = choice[0]

audio = tfio.audio.AudioIOTensor(test_path)
audio_tensor = tf.squeeze(audio.to_tensor(), axis=-1)
print(f'Label: {test_label} [{test_idx}]')
display(Audio(audio_tensor.numpy(), rate=audio.rate.numpy()))

In [None]:
spectrogram = get_spectrogram(audio_tensor.numpy())
spectrogram_ = tf.squeeze(spectrogram, -1)

fig = plt.figure()
ax = fig.add_subplot()
ax.axis('off')
_ = ax.imshow(tf.math.log(spectrogram_.numpy()), cmap='gray')

print(f'예측 레이블: {lab2idx[np.argmax(model.predict(tf.expand_dims(spectrogram, 0)))]}')