In [None]:
# Upgrade environment to support TF 2.10 in Colab
!pip install -U --pre tensorflow tensorflow_datasets
!apt install --allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2

In [None]:
import os
import pathlib

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display
from scipy.signal import resample

# Set the seed value for experiment reproducibility.
seed = 195397
tf.random.set_seed(seed)
np.random.seed(seed)

In [None]:
DATASET_PATH = 'data/mini_speech_commands'

data_dir = pathlib.Path(DATASET_PATH)
if not data_dir.exists():
  tf.keras.utils.get_file(
      'mini_speech_commands.zip',
      origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
      extract=True,
      cache_dir='.', cache_subdir='data')

In [None]:
commands = np.array(tf.io.gfile.listdir(str(data_dir)))
commands = commands[commands != 'README.md']
print('Commands:', commands)

In [None]:
train_ds, val_ds = tf.keras.utils.audio_dataset_from_directory(
    directory=data_dir,
    batch_size=64,
    validation_split=0.2,
    seed=0,
    output_sequence_length=16000,
    subset='both')

label_names = np.array(train_ds.class_names)
print()
print("label names:", label_names)

In [None]:
train_ds.element_spec

In [None]:
def squeeze(audio, labels):
  audio = tf.squeeze(audio, axis=-1)
  return audio, labels

train_ds = train_ds.map(squeeze, tf.data.AUTOTUNE)
val_ds = val_ds.map(squeeze, tf.data.AUTOTUNE)

In [None]:
test_ds = val_ds.shard(num_shards=2, index=0)
val_ds = val_ds.shard(num_shards=2, index=1)

In [None]:
for example_audio, example_labels in train_ds.take(1):
  print(example_audio.shape)
  print(example_labels.shape)

In [None]:
label_names[[1,2,3,0]]

In [None]:
rows = 3
cols = 3
n = rows * cols
fig, axes = plt.subplots(rows, cols, figsize=(16, 9))

for i in range(n):
  if i>=n:
    break
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  ax.plot(example_audio[i].numpy())
  ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
  label = label_names[example_labels[i]]
  ax.set_title(label)
  ax.set_ylim([-1.1,1.1])

plt.show()

In [None]:
def get_spectrogram(waveform):
  # Convert the waveform to a spectrogram via a STFT.
  spectrogram = tf.signal.stft(
      waveform, frame_length=255, frame_step=128)
  # Obtain the magnitude of the STFT.
  spectrogram = tf.abs(spectrogram)
  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (which expect
  # shape (`batch_size`, `height`, `width`, `channels`).
  spectrogram = spectrogram[..., tf.newaxis]
  return spectrogram

In [None]:
for i in range(3):
  label = label_names[example_labels[i]]
  waveform = example_audio[i]
  spectrogram = get_spectrogram(waveform)

  print('Label:', label)
  print('Waveform shape:', waveform.shape)
  print('Spectrogram shape:', spectrogram.shape)
  print('Audio playback')
  display.display(display.Audio(waveform, rate=16000))

In [None]:
def plot_spectrogram(spectrogram, ax):
  if len(spectrogram.shape) > 2:
    assert len(spectrogram.shape) == 3
    spectrogram = np.squeeze(spectrogram, axis=-1)
  # Convert the frequencies to log scale and transpose, so that the time is
  # represented on the x-axis (columns).
  # Add an epsilon to avoid taking a log of zero.
  log_spec = np.log(spectrogram.T + np.finfo(float).eps)
  height = log_spec.shape[0]
  width = log_spec.shape[1]
  X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
  Y = range(height)
  ax.pcolormesh(X, Y, log_spec)

In [None]:
fig, axes = plt.subplots(2, figsize=(12, 8))
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform.numpy())
axes[0].set_title('Waveform')
axes[0].set_xlim([0, 16000])

plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
plt.suptitle(label.title())
plt.show()

In [None]:
def make_spec_ds(ds):
  return ds.map(
      map_func=lambda audio,label: (get_spectrogram(audio), label),
      num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
train_spectrogram_ds = make_spec_ds(train_ds)
val_spectrogram_ds = make_spec_ds(val_ds)
test_spectrogram_ds = make_spec_ds(test_ds)

In [None]:
for example_spectrograms, example_spect_labels in train_spectrogram_ds.take(1):
  break

In [None]:
rows = 4
cols = 4
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(16, 9))

for i in range(n):
    r = i // cols
    c = i % cols
    ax = axes[r][c]
    plot_spectrogram(example_spectrograms[i].numpy(), ax)
    ax.set_title(commands[example_spect_labels[i].numpy()])

plt.show()

In [None]:
train_spectrogram_ds = train_spectrogram_ds.cache().shuffle(10000).prefetch(tf.data.AUTOTUNE)
val_spectrogram_ds = val_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)
test_spectrogram_ds = test_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)

In [None]:
input_shape = example_spectrograms.shape[1:]
print('Input shape:', input_shape)
num_labels = len(commands)

# Instantiate the `tf.keras.layers.Normalization` layer.
norm_layer = layers.Normalization()
# Fit the state of the layer to the spectrograms
# with `Normalization.adapt`.
norm_layer.adapt(data=train_spectrogram_ds.map(map_func=lambda spec, label: spec))

model = models.Sequential([
    layers.Input(shape=input_shape),
    # Downsample the input.
    layers.Resizing(32, 32),
    # Normalize.
    norm_layer,
    layers.Conv2D(32, 3, activation='relu'),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(num_labels),
])

model.summary()

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
EPOCHS = 30
history = model.fit(
    train_spectrogram_ds,
    validation_data=val_spectrogram_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=10, min_delta=0.001),
)

In [None]:
metrics = history.history
plt.figure(figsize=(16,6))
plt.subplot(1,2,1)
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch')
plt.ylabel('Loss [CrossEntropy]')

plt.subplot(1,2,2)
plt.plot(history.epoch, 100*np.array(metrics['accuracy']), 100*np.array(metrics['val_accuracy']))
plt.legend(['accuracy', 'val_accuracy'])
plt.ylim([0, 100])
plt.xlabel('Epoch')
plt.ylabel('Accuracy [%]')

In [None]:
model.evaluate(test_spectrogram_ds, return_dict=True)

In [None]:
y_pred = model.predict(test_spectrogram_ds)

In [None]:
y_pred = tf.argmax(y_pred, axis=1)

In [None]:
y_true = tf.concat(list(test_spectrogram_ds.map(lambda s,lab: lab)), axis=0)

In [None]:
confusion_mtx = tf.math.confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx,
            xticklabels=commands,
            yticklabels=commands,
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

In [None]:
# 1. 오디오 파일 읽기
def load_audio(audio_path):
    audio_binary = tf.io.read_file(str(audio_path))
    waveform, sample_rate = tf.audio.decode_wav(audio_binary, desired_channels=1)
    waveform = tf.squeeze(waveform, axis=-1)  # 채널 제거
    return waveform, sample_rate

# 2. 오디오 리샘플링
def resample_audio(waveform, original_sample_rate, desired_sample_rate=16000):
    if original_sample_rate.numpy() != desired_sample_rate:
        original_samples = waveform.shape[0]
        desired_samples = int(original_samples * desired_sample_rate / original_sample_rate.numpy())
        waveform = resample(waveform.numpy(), desired_samples)
    return waveform, desired_sample_rate

# 5. 메인 실행 코드
audio_path = data_dir/'left.wav'  # 업로드한 오디오 파일 경로

# 원본 오디오
original_waveform, original_sample_rate = load_audio(audio_path)

# 리샘플링된 오디오
resampled_waveform, resampled_sample_rate = resample_audio(original_waveform, original_sample_rate)

# 스펙트로그램 생성
original_spectrogram = get_spectrogram(original_waveform)
resampled_spectrogram = get_spectrogram(resampled_waveform)

# 시각화
fig, axes = plt.subplots(4, figsize=(12, 16))

# 1. 원본 파형
axes[0].plot(np.arange(original_waveform.shape[0]), original_waveform.numpy())
axes[0].set_title(f'Original Waveform (Sample rate: {original_sample_rate.numpy()} Hz)')
axes[0].set_xlabel('Sample')
axes[0].set_ylabel('Amplitude')

# 2. 리샘플링된 파형
axes[1].plot(np.arange(len(resampled_waveform)), resampled_waveform)
axes[1].set_title(f'Resampled Waveform (Sample rate: {resampled_sample_rate} Hz)')
axes[1].set_xlabel('Sample')
axes[1].set_ylabel('Amplitude')

# 3. 원본 스펙트로그램
plot_spectrogram(original_spectrogram.numpy(), axes[2])
axes[2].set_title('Original Spectrogram')

# 4. 리샘플링된 스펙트로그램
plot_spectrogram(resampled_spectrogram.numpy(), axes[3])
axes[3].set_title('Resampled Spectrogram')

plt.tight_layout()
plt.show()

display.display(display.Audio(resampled_waveform, rate=16000))

In [None]:
# 리샘플링된 파형을 TensorFlow 텐서로 변환
resampled_waveform_tensor = tf.convert_to_tensor(resampled_waveform, dtype=tf.float32)

# 스펙트로그램 생성
resampled_spectrogram = get_spectrogram(resampled_waveform_tensor)

# 모델 입력 형식에 맞게 차원 확장 (배치 차원 추가)
resampled_input = resampled_spectrogram[tf.newaxis, ...]

# 모델 예측
prediction = model(resampled_input)

# 예측 결과 시각화
plt.bar(commands, tf.nn.softmax(prediction[0]))
plt.title('Predicted Command')
plt.show()

# 오디오 재생
display.display(display.Audio(resampled_waveform, rate=resampled_sample_rate))


In [None]:
x = data_dir/'left.wav'
x = tf.io.read_file(str(x))
x, sample_rate = tf.audio.decode_wav(x, desired_channels=1, desired_samples=48000,)
x = tf.squeeze(x, axis=-1)
waveform = x
x = get_spectrogram(x)
x = x[tf.newaxis,...]

prediction = model(x)
plt.bar(commands, tf.nn.softmax(prediction[0]))
plt.title('No')
plt.show()

display.display(display.Audio(waveform, rate=48000))

In [None]:
x = data_dir / 'right.wav'
x = tf.io.read_file(str(x))
x, sample_rate = tf.audio.decode_wav(x, desired_channels=1, desired_samples=48000)
x = tf.squeeze(x, axis=-1)
waveform = x
x = get_spectrogram(x)
x = x[tf.newaxis, ...]

prediction = model(x)

# 그래프 크기 조정
plt.figure(figsize=(12, 6))  # 너비 12, 높이 6으로 설정

# 바 그래프 시각화
plt.bar(commands, tf.nn.softmax(prediction[0]))
plt.title('right')

# 가로축 레이블 회전 추가
plt.xticks(rotation=45)  # 45도 회전
plt.xlabel('Commands')  # x축 레이블 추가
plt.ylabel('Probability')  # y축 레이블 추가
plt.tight_layout()  # 그래프 간격 조정

plt.show()

# 오디오 재생
display.display(display.Audio(waveform, rate=48000))
