<a href="https://colab.research.google.com/github/kimyh1207/-SNS-/blob/main/Speech_recognition_Wave.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple speech recognition

Audio 데이터를 다뤄서 학습하는 방법을 배워보도록 합시다.
머신러닝 작업과정은 아래와 같습니다.

1. Examine and understand data
2. Build an input pipeline
3. Build the model
4. Train the model
5. Test the model
6. Improve the model and repeat the process

* 모델 완성 후 평가 지표에 따라서 모델을 평가해 봅시다.

## Project 설명
### Task
* 1초 길이의 오디오 음성데이터를 이용해 단어를 분류하는 것이 목표입니다.
* 주어진 데이터를 이용해 딥러닝 트레이닝 과정을 구현해 보는것이 목표입니다.
* This is version 0.01 of the data set containing 64,727 audio files, released on August 3rd 2017.

### Baseline
* ResNet 구조와 유사한 skip connection 구조를 구현해 보자.
* 오버피팅을 방지하기 위한 다양한 방법들을 사용해보자.
* Training

### Import packages

* 우리가 사용할 packages 를 import 하는 부분 입니다.
* 필요에 따른 packages를 선언합니다.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

import librosa
import librosa.display
import matplotlib.pyplot as plt

import os
from os.path import isdir, join

import random
import copy
import sys

# PyTorch version
print(torch.__version__)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Import modules

* Colab 적용을 위한 변수 지정 및 드라이브 마운트

In [None]:
use_colab = True
assert use_colab in [True, False]

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Load dataset
* 사용할 데이터셋을 살펴봅시다.

In [None]:
if use_colab:
    DATASET_PATH = #TODO
else:
    DATASET_PATH = "./"

if not os.path.isdir(DATASET_PATH):
    os.makedirs(DATASET_PATH)

In [None]:
# npz 파일은 npy 파일들을 압축한 파일입니다.
speech_data = np.load(os.path.join(#TODO, "speech_wav_8000.npz"))

* npz 형태의 파일은 npy의 압축형태이며, files 이름 내에 데이터를 저장하고 불러올 수 있습니다.
* files를 출력하면, 데이터가 어떤 key값으로 저장되어 있는지 확인할 수 있습니다!

In [None]:
# files를 입력해보시면, 압축된 파일(npy)들의 종류를 확인할 수 있습니다.
print(speech_data.files)

* 각 데이터가 어떤 형태로 저장되어 있는지 확인해봅시다.

In [None]:
# 불러온 데이터들의 모양
print(speech_data["wav_vals"].shape, speech_data["label_vals"].shape)
# labels 는 현재 text 상태이기 때문에 추후에 index(int)형태로 바꿔주게 됩니다.

* 숫자로 이뤄진 데이터가 진짜 오디오 데이터가 맞는지 확인해봅시다.

In [None]:
idx = 219
test_audio = speech_data["wav_vals"][idx]
test_labels = speech_data["label_vals"][idx]

In [None]:
import IPython.display as ipd

sr = 8000 # 1초동안 재생되는 샘플의 갯수
data = test_audio

print(test_labels)
ipd.Audio(data, rate=sr)

### Model dataset setting
* 변환된 데이터를 이용해서 학습에 활용할 데이터셋을 설정한다.

In [None]:
sr = 8000 # 음성파일의 sample rate가 8000 인 것을 확인
train_wav, test_wav, train_label, test_label = train_test_split(#TODO, # wav 파일들의 데이터
                                                                #TODO, # label 파일들의 데이터
                                                                test_size=#TODO, # 비율 train, test를 몇퍼센트의 비율로 나눌지
                                                                shuffle=#TODO) # 섞을 것인지?
                                                                #(파일, 정답) 이 형태로 섞어주게됩니다.

# for convolution layers
#[40000, 8000] => [40000, 8000, 1] => 40000 * 8000 == 40000 * 8000 * 1
# reshape은 항상 데이터의 총량이 변하지 않도록 설정해주시면 됩니다.
train_wav = train_wav.reshape([#TODO]) # channel [data len, 8000] -> [data len, 8000, 1]
test_wav = test_wav.reshape([#TODO])
# (50000, 8000, 1)

print(train_wav.shape)
print(test_wav.shape)
print(train_label.shape)
print(test_label.shape)

### Label 데이터를 구분해보자
* 현재 정답 데이터는 다양한 단어들이 섞여 있다.

In [None]:
# 사용되는 모든 라벨을 가져다 set으로 설정 => 중복제거
set(speech_data["label_vals"].flatten())

In [None]:
# del raw dataset for mem
del speech_data

* 총 12개의 클래스르 분류하는 작업이된다.
* unknown과 silence에는 target list 이외의 단어가 들어간다. (혹은 노이즈)

In [None]:
# target list
label_value = ['yes', 'no', 'up', 'down', 'left', 'right', 'on',
               'off', 'stop', 'go', 'unknown', 'silence']

new_label_value = dict() # 사전에 입력
for i, l in enumerate(label_value):
    new_label_value[l] = i
label_value = new_label_value # 일종의 번역사전을 만들게 됩니다.

In [None]:
label_value

* Text 데이터를 index 데이터로 변환
    * CIFAR10 데이터셋에서 이미 처리되었던 부분

In [None]:
# temp 변수를 이용해서 기존 text 형태인 label을 idx 형태로 변경해준다.
temp = []
for v in train_label:
    temp.append(label_value[v[0]])
train_label = np.array(temp)

temp = []
for v in test_label:
    temp.append(label_value[v[0]])
test_label = np.array(temp)

del temp

In [None]:
print('Train_Wav Demension : ' + str(np.shape(train_wav)))
print('Train_Label Demension : ' + str(np.shape(train_label)))
print('Test_Wav Demension : ' + str(np.shape(test_wav)))
print('Test_Label Demension : ' + str(np.shape(test_label)))
print('Number Of Labels : ' + str(len(label_value)))

### Checkpoint setting
* 학습 전반에서 사용할 checkpoint dir을 설정한다.

In [None]:
# the save point 설정
if use_colab:
    checkpoint_dir ='./drive/My Drive/train_ckpt/wave/exp1'
    if not os.path.isdir(checkpoint_dir):
        os.makedirs(checkpoint_dir)
else:
    checkpoint_dir = 'wave/exp1'

### Dataset 구성

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

# Custom Dataset
class AudioDataset(Dataset):
    def __init__(self, wav_data, labels, num_classes=12, transform=None):
        self.wav_data = #TODO
        self.labels = #TODO
        self.num_classes = #TODO
        self.transform = transform

    def __len__(self):
        return len(self.wav_data)

    def __getitem__(self, idx):
        wav = self.wav_data[idx]
        label = self.labels[idx]

        # One-hot encoding
        one_hot_label = torch.zeros(#TODO)
        one_hot_label[label] = 1

        if self.transform:
            wav = self.transform(wav)
        return wav, one_hot_label

# Parameters
batch_size = #TODO

# For train
train_dataset = AudioDataset(#TODO, #TODO, num_classes=#TODO)
train_loader = DataLoader(#TODO, batch_size=#TODO, shuffle=True)

# For test
test_dataset = AudioDataset(#TODO, #TODO, num_classes=#TODO)
test_loader = DataLoader(#TODO, batch_size=#TODO, shuffle=False)

# Check dataset loaders
print(f"Train dataset batches: {len(train_loader)}")
print(f"Test dataset batches: {len(test_loader)}")


### Model 구현
* Wave 파일 데이터를 이용해 학습을 할 수 있는 모델을 구현합니다.

In [None]:
import torch.nn.functional as F


class Conv1DModel(nn.Module):
    def __init__(self, input_channels, num_classes=12):
        super(Conv1DModel, self).__init__()
        #TODO

    def forward(self, x):
        #TODO


# Example usage
sr = 8000
input_channels = 1
model = Conv1DModel(input_channels=#TODO, num_classes=#TODO).to(device)


In [None]:
criterion = #TODO
optimizer = #TODO

In [None]:
# 모델을 추론 모드로 설정 (학습 비활성화)
model.eval()

# 예제 입력 데이터
train_wav = torch.randn(1, 1, 8000).to(device)  # 배치 크기=1, 채널=1, 길이=8000

# 추론 수행
with torch.no_grad():  # 그라디언트 계산 비활성화
    predictions = model(train_wav)

# 출력 결과 확인
print("Predictions: ", predictions.cpu().numpy())

In [None]:
from torchsummary import summary

summary(model, input_size=(1, 8000))

### Model training
* 모델 체크포인트로 저장공간을 확인 후 학습을 진행합니다.

In [None]:
# 체크포인트 저장 함수
def save_checkpoint(model, optimizer, epoch, loss, checkpoint_dir='checkpoints', best_loss=None):
    os.makedirs(checkpoint_dir, exist_ok=True)
    checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch}.pth")

    # 체크포인트 딕셔너리 저장
    checkpoint = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch,
        'loss': loss,
        'best_loss': best_loss,
    }
    torch.save(checkpoint, checkpoint_path)
    print(f"Checkpoint saved at {checkpoint_path}")

# 체크포인트 로드 함수
def load_checkpoint(model, optimizer, checkpoint_path):
    if os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        epoch = checkpoint['epoch']
        loss = checkpoint['loss']
        print(f"Checkpoint loaded from {checkpoint_path}")
        return epoch, loss
    else:
        print(f"No checkpoint found at {checkpoint_path}")
        return 0, None

# 예시: 특정 조건에서 체크포인트 저장
best_val_loss = float('inf')  # 초기화
current_epoch = 5  # 예제 값
current_val_loss = 0.2  # 예제 값

if current_val_loss < best_val_loss:
    best_val_loss = current_val_loss
    save_checkpoint(model, optimizer, current_epoch, current_val_loss, best_loss=best_val_loss)

In [None]:
# 기본 학습 루프
def train_model(model, train_loader, test_loader, criterion, optimizer, max_epochs, checkpoint_dir):
    best_val_loss = float('inf')

    for epoch in range(max_epochs):
        model.train()  # 학습 모드 활성화
        running_loss = 0.0

        # Training step
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            # GPU가 사용 가능하면 데이터를 GPU로 이동
            inputs, labels = #TODO

            # Forward pass
            outputs = #TODO
            loss = #TODO

            # Backward pass
            #TODO

            running_loss += loss.item()

        avg_train_loss = running_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{max_epochs}], Train Loss: {avg_train_loss:.4f}")

        # Validation step
        model.eval()  # 평가 모드 활성화
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = #TODO
                outputs = #TODO
                loss = #TODO
                val_loss += loss.item()

        avg_val_loss = val_loss / len(test_loader)
        print(f"Epoch [{epoch+1}/{max_epochs}], Validation Loss: {avg_val_loss:.4f}")

        # Checkpoint 저장
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            save_checkpoint(model, optimizer, epoch+1, avg_val_loss, checkpoint_dir=checkpoint_dir)
            print("Checkpoint saved for better validation loss.")

    print("Training complete.")

# 학습 루프 실행
train_model(
    model=#TODO,
    train_loader=#TODO,
    test_loader=#TODO,
    criterion=#TODO,
    optimizer=#TODO,
    max_epochs=#TODO,
    checkpoint_dir='checkpoints'
)


### 학습 결과 확인
* model fit의 return 값인 history에서 학습에 대한 결과를 확인해보자

In [None]:
# 가중치 로드
checkpoint_path = os.path.join(checkpoint_dir, "checkpoint_epoch_best.pth")
epoch, loss = load_checkpoint(model, optimizer, checkpoint_path)


## Evaluation
* Test dataset을 이용해서 모델의 성능을 평가합니다.

In [None]:
def evaluate_model(model, test_loader, criterion):
    model.eval()  # 평가 모드 활성화
    total_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            # GPU가 사용 가능하면 데이터를 GPU로 이동
            inputs, labels = #TODO

            # 모델 예측
            outputs = #TODO

            # 손실 계산
            loss = #TODO
            total_loss += loss.item()

            # 정확도 계산
            if outputs.ndimension() == 2:  # 다중 클래스 분류
                _, preds = torch.max(outputs, dim=1)  # 예측 클래스 인덱스
                _, true_labels = torch.max(labels, dim=1)  # 실제 클래스 인덱스
            else:  # 이진 분류 또는 단일 출력
                preds = (outputs > 0.5).long()
                true_labels = labels.long()

            correct += (preds == true_labels).sum().item()
            total += labels.size(0)

    # 평균 손실 및 정확도 계산
    avg_loss = total_loss / len(test_loader)
    accuracy = correct / total
    return avg_loss, accuracy

# 모델 평가
test_loss, test_accuracy = evaluate_model(model, test_loader, criterion)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")
