# 발음기호 예측 베이스라인 코드(Deepspeech2 기반)

참고자료1 - https://arxiv.org/abs/1512.02595<br>

참고자료2 - https://www.assemblyai.com/blog/end-to-end-speech-recognition-pytorch

DeepSpeech2는 Baidu에서 개발한 End-to-End STT 알고리즘입니다.<br>해당 모델을 학습시켜 대회데이터에 예측과 제출까지 해보는 베이스라인 코드를 초보자도 쉽게 이해할 수 있도록 작성하였습니다.

# 필요 패키지 install 및 import

In [1]:
# 패키지 인스톨
!pip install torchaudio==0.4.0 torch==1.4.0 comet-ml==3.0.2 eng_to_ipa



In [2]:
# 패키지 임포트

import os
import eng_to_ipa as ipa
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np
from comet_ml import Experiment ##
import pandas as pd

# 폴더 설정 및 생성

In [3]:
# 본인 구글 드라이브와 연결한다.
from google.colab import drive
drive.mount('/gdrive')

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [4]:
# 현재 디렉토리에 "data"라는 폴더를 생성합니다.
if not os.path.isdir("./data"):
  os.makedirs("./data")

# Evaluation metric 설정

자체 평가를 위한 여러 함수 정의
1. _levenshtein_distance = 두 문자열의 차이를 측정하기 위한 메트릭

2. cer = character error rate(stt 모델에서 흔히 사용하는 메트릭임 - http://blog.atlaslabs.ai/all/tech/105/)

3. wer = word error rate(stt 모델에서 흔히 사용하는 메트릭임 - https://docs.microsoft.com/ko-kr/azure/cognitive-services/speech-service/how-to-custom-speech-evaluate-data)


In [5]:
def _levenshtein_distance(ref, hyp):
    """Levenshtein 거리는 두 문자열의 차이를 측정하기 위한 메트릭이다.
      구체적으로는 두 문자열 s1,s2이 있다고 했을 때,
      s1을 s2로 변경하는 데 필요한 단어의 삭제, 삽입, 대체 등의 작업의 총 갯수이다."""
    m = len(ref)
    n = len(hyp)

    if ref == hyp:
        return 0
    if m == 0:
        return n
    if n == 0:
        return m

    if m < n:
        ref, hyp = hyp, ref
        m, n = n, m

    distance = np.zeros((2, n + 1), dtype=np.int32)

    for j in range(0,n + 1):
        distance[0][j] = j

    for i in range(1, m + 1):
        prev_row_idx = (i - 1) % 2
        cur_row_idx = i % 2
        distance[cur_row_idx][0] = i
        for j in range(1, n + 1):
            if ref[i - 1] == hyp[j - 1]:
                distance[cur_row_idx][j] = distance[prev_row_idx][j - 1]
            else:
                s_num = distance[prev_row_idx][j - 1] + 1
                i_num = distance[cur_row_idx][j - 1] + 1
                d_num = distance[prev_row_idx][j] + 1
                distance[cur_row_idx][j] = min(s_num, i_num, d_num)

    return distance[m % 2][n]

def cer(reference, hypothesis):
    """Character Error Rate을 계산하는 코드"""

    reference = reference.lower()
    hypothesis = hypothesis.lower()

    edit_distance = _levenshtein_distance(reference, hypothesis)
    ref_len = len(reference)

    if ref_len == 0:
        raise ValueError("Length of reference should be greater than 0.")

    cer = float(edit_distance) / ref_len
    return cer

def wer(reference, hypothesis, delimiter=' '):
    """Word Error Rate을 계산하는 코드"""

    reference = reference.lower()
    hypothesis = hypothesis.lower()

    ref_words = reference.split(delimiter)
    hyp_words = hypothesis.split(delimiter)

    edit_distance = _levenshtein_distance(ref_words, hyp_words)

    edit_distance = float(edit_distance)
    ref_len = len(ref_words)

    if ref_len == 0:
        raise ValueError("Reference's word number should be greater than 0.")

    wer = float(edit_distance) / ref_len
    return wer

# 캐릭터를 숫자로 매핑하기 위한 Class 지정

In [6]:
import string

In [7]:
class TextTransform:
    """캐릭터를 숫자로 매핑하는 용도"""
    def __init__(self):
        self.char_map = {v:k+2 for k, v in enumerate(string.ascii_lowercase)} #character를 집어넣으면 숫자가 리턴
        self.index_map = {k+2:v for k, v in enumerate(string.ascii_lowercase)} #숫자를 집어넣으면 character가 리턴
        self.index_map[0] = ' '
        self.index_map[1] = "'"
        self.char_map[' '] = 0
        self.char_map["'"] = 1

    def text_to_int(self, text):
        """ 텍스트를 숫자로 매핑하는 함수 """
        int_sequence = []
        for c in text:
            ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ 숫자를 텍스트로 매핑하는 함수 """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string)

In [8]:
text_transform = TextTransform()

# SpecAugment를 위한 변환 함수 정의

SpecAugment란 음성 데이터의 data augmentation 기법의 일종으로...

시간과 주파수를 무작위로 단순히 잘라냅니다.

놀랍게도 이런 단순한 방식으로도 stt 모델의 오버피팅을 막을 수 있습니다.

https://arxiv.org/abs/1904.08779

In [10]:
train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128), # Spectogram(푸리에 변환), 푸리에 변환을 적용하면 각 진동수 성분이 그 음성에 얼마나 들어있는지 알 수 있습니다.
    torchaudio.transforms.FrequencyMasking(freq_mask_param=15), #frequency를 가린다(specaugment의 일환)
    torchaudio.transforms.TimeMasking(time_mask_param=35) #time을 가린다(specaugment의 일환)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram() # Spectogram(푸리에 변환), 푸리에 변환을 적용하면 각 진동수 성분이 그 음성에 얼마나 들어있는지 알 수 있습니다.

# train 데이터에는 specaugment를 적용하고 valid 데이터에는 적용하지 않는다.

In [11]:
train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate= 16000, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=15),
    torchaudio.transforms.TimeMasking(time_mask_param=35)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram()

# 데이터 변환을 위한 _processing 함수 정의

우리는 본 튜토리얼에서 librispeech 데이터셋을 활용할 것입니다.

librispeech 데이터셋 특성상 라벨링 데이터가 알파벳으로 구성되어 있기 때문에, 발음기호를 예측하는 본 대회에서는 그대로 활용할 수는 없습니다.

따라서 발음기호로 변환해 새로운 라벨링 데이터를 생성해야 합니다.

변환 방법은 아래 링크에서 보실 수 있습니다.

https://bit.ly/3nADIUX

In [22]:
def train_processing(data):
    spectrograms = [] # 음성 데이터를 append하는 리스트
    labels = [] # 텍스트를 숫자로 바꾼 결과물을 append하는 리스트
    input_lengths = [] # 음성 데이터의 길이를 2로 나눈 값을 append하는 리스트
    label_lengths = [] # 텍스트의 길이를 append하는 리스트
    for (waveform, _, utterance, _, _, _) in data:
        spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1) # 차원을 하나 없애고 역을 취한다(원래는 시간축이 뒤에 있었는데,,, 역을 취하여 앞에 가도록 바꾼다).
        spectrograms.append(spec)

        utterance = utterance.lower().replace('ˈ','').replace('ˌ','').replace('*','')
        label = torch.Tensor(text_transform.text_to_int(utterance))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True) # 소리의 길이가 다르므로 하나의 Tensor로 만들기 위하여 padding을 진행하고 Tensor로 만든다.
    spectrograms = spectrograms.unsqueeze(1) # 차원을 하나 더 붙힌다.
    spectrograms = spectrograms.transpose(2, 3) # transpose를 한다. (배치 크기, 1, feature크기, 음성길이)로 만들었다.
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True) # 라벨의 길이가 다르므로 하나의 Tensor로 만들기 위하여 padding을 진행하고 Tensor로 만든다.

    return spectrograms, labels, input_lengths, label_lengths

def val_processing(data):
    spectrograms = [] # 음성 데이터를 append하는 리스트
    labels = [] # 텍스트를 숫자로 바꾼 결과물을 append하는 리스트
    input_lengths = [] # 음성 데이터의 길이를 2로 나눈 값을 append하는 리스트
    label_lengths = [] # 텍스트의 길이를 append하는 리스트
    for (waveform, _, utterance, _, _, _) in data:
        spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1) # 차원을 하나 없애고 역을 취한다(원래는 시간축이 뒤에 있었는데,,, 역을 취하여 앞에 가도록 바꾼다).
        spectrograms.append(spec)

        utterance = utterance.lower().replace('ˈ','').replace('ˌ','').replace('*','')

        label = torch.Tensor(text_transform.text_to_int(utterance))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True) # 소리의 길이가 다르므로 하나의 Tensor로 만들기 위하여 padding을 진행하고 Tensor로 만든다.
    spectrograms = spectrograms.unsqueeze(1) # 차원을 하나 더 붙힌다.
    spectrograms = spectrograms.transpose(2, 3) # transpose를 한다. (배치 크기, 1, feature크기, 음성길이)로 만들었다.
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True) # 라벨의 길이가 다르므로 하나의 Tensor로 만들기 위하여 padding을 진행하고 Tensor로 만든다.

    return spectrograms, labels, input_lengths, label_lengths

# 실제 데이터 Load(Pytorch의 Dataset과 DataLoader 클래스를 이용)

이번 베이스라인 코드에서는 librispeech의 데이터셋을 학습시킬 것입니다. librispeech 데이터 중 본 대회에 활용할 수 있는 데이터는 아래와 같습니다.(이외의 데이터 활용시 실격!!)


1. train-clean-100
2. train-clean-360
3. train-other-500

이번 튜토리얼에서는 콜랩의 용량상 train-clean-100의 80%를 학습에 사용하고, 나머지 20%를 validation에 사용할 것입니다.

In [14]:
dataset = torchaudio.datasets.LIBRISPEECH("./data", url='train-clean-100', download=True)

In [15]:
# dataset 중에서 80,20으로 split하여 train_dataset과 test_dataset 생성
lengths = [int(dataset.__len__()*0.8), dataset.__len__()-int(dataset.__len__()*0.8)]
                                                                                                                                                                                                                                          
import torch.utils.data as data
train_dataset, test_dataset = data.random_split(dataset, lengths)

-----------

In [27]:
iterer = iter(test_dataset)

In [28]:
output = next(iterer)

In [24]:
waveform = output[0]

In [27]:
transform_res = train_audio_transforms(waveform)
transform_res_ = transform_res.squeeze(0).transpose(0,1)

In [29]:
utterance = output[2]

In [30]:
utterance

'AS I GAZED UPON THEM I WONDERED TO WHAT SECRET PLACE WITHIN THEIR BROODING IMMENSITIES THE LITTLE METAL MYSTERIES HAD FLED AND TO WHAT MYRIADS IT MIGHT BE OF THEIR KIND'

In [32]:
utterance_ = utterance.lower().replace('ˈ','').replace('ˌ','').replace('*','')
utterance_

'as i gazed upon them i wondered to what secret place within their brooding immensities the little metal mysteries had fled and to what myriads it might be of their kind'

In [None]:
label = text_transform.text_to_int(utterance_)
label

In [40]:
transform_res_.shape[0]//2

1318

In [31]:
transform_res_.shape

torch.Size([1318, 128])

In [34]:
spectrograms = [] # 음성 데이터를 append하는 리스트
labels = [] # 텍스트를 숫자로 바꾼 결과물을 append하는 리스트
input_lengths = [] # 음성 데이터의 길이를 2로 나눈 값을 append하는 리스트
label_lengths = [] # 텍스트의 길이를 append하는 리스트
for _ in range(2):
  (waveform, _, utterance, _, _, _) = next(iterer)
  spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1) # 차원을 하나 없애고 역을 취한다(원래는 시간축이 뒤에 있었는데,,, 역을 취하여 앞에 가도록 바꾼다).
  spectrograms.append(spec)

  utterance =  utterance.lower().replace('ˈ','').replace('ˌ','').replace('*','')

  label = torch.Tensor(text_transform.text_to_int(utterance))
  labels.append(label)
  input_lengths.append(spec.shape[0]//2)
  label_lengths.append(len(label))

In [35]:
spectrograms_ = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True)

In [36]:
print(spectrograms[0].shape)
print(spectrograms[1].shape)

torch.Size([1191, 128])
torch.Size([429, 128])


In [37]:
spectrograms__ = spectrograms_.unsqueeze(1).transpose(2,3)

In [38]:
spectrograms__.shape

torch.Size([2, 1, 128, 1191])

In [39]:
print(len(labels[0]))
print(len(labels[1]))

224
62


In [40]:
labels_ = nn.utils.rnn.pad_sequence(labels, batch_first=True)

In [41]:
labels_.shape

torch.Size([2, 224])

------------

만약 dataset에 "train-clean-100"뿐만 아니라 "train-clean-360", "train-other-500"까지 활용하고 싶다면 아래와 같이 코드를 짜주시면 됩니다.

In [None]:
# dataset = data.ConcatDataset(
#             [
#                 LIBRISPEECH("./data", url=path, download=True)
#                 for path in ["train-clean-100","train-clean-360","train-other-500"]
#             ]
#         )

In [16]:
# 위의 dataset 클래스를 dataloader로 바꿔줍니다.
kwargs = {'num_workers': 5, 'pin_memory': True} if torch.cuda.is_available() else {}

train_loader = data.DataLoader(dataset=train_dataset,
                            batch_size=20,
                            shuffle=True,
                            collate_fn=train_processing,
                            **kwargs)
test_loader = data.DataLoader(dataset=test_dataset,
                            batch_size=20,
                            shuffle=False,
                            collate_fn=val_processing,
                            **kwargs)

# Decoding하기 - 해당 대회에서는 단순 GreedyDecoder를 활용

In [17]:
# 예측 결과를 통해 최종 텍스트 추출(나중에)
def GreedyDecoder(output, labels, label_lengths, blank_label=27, collapse_repeated=True):
	arg_maxes = torch.argmax(output, dim=2)
	decodes = []
	targets = []
	for i, args in enumerate(arg_maxes):
		decode = []
		targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
		for j, index in enumerate(args):
			if index != blank_label:
				if collapse_repeated and j != 0 and index == args[j -1]:
					continue
				decode.append(index.item())
		decodes.append(text_transform.int_to_text(decode))
	return decodes, targets

# Model Architecture(DeepSpeech2 기반)

1. CNNLayerNorm : CNNLayer의 인풋을 Normalizing하기 위한 클래스<br>

2. ResidualCNN : ResCNN을 위한 클래스 (Residual 커넥션을 활용하면 DEEP한 네트워크에서도 CNN에 대한 높은 정확도와 빠른 학습을 기대할 수 있다)<br>

3. BidirectionalGRU : 양방햔 GRU(RNN의 일종)을 위한 클래스<br>

4. SpeechRecognitionModel : 위의 요소들을 종합한 최종 Speech Recognition Model 클래스

In [23]:
class CNNLayerNorm(nn.Module):
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 

class ResidualCNN(nn.Module): # 컨볼루션->normalize->활성화->dropout의 연속
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()
        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

        self.batch_norm1 = nn.BatchNorm2d(in_channels)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time) (배치 크기, 채널 수, feature, 음성의 길이)
        # x = self.batch_norm1(x) # normalize
        x = self.layer_norm1(x) # layer의 숫자를 normalize한다.
        x = F.relu(x) # relu와 같은 활성화 함수의 일종(최근에 나온 활성화 함수로 모델의 깊이가 깊어질 수록 중요해진다).
        # x = self.dropout1(x) # overfitting을 막기 위한 dropout
        x = self.cnn1(x) # 컨볼루션 신경망
        x = self.layer_norm2(x) # normalize
        # x = self.batch_norm2(x) # normalize
        x = F.relu(x) # 활성화 함수
        # x = self.dropout2(x) # dropout
        x = self.cnn2(x) # 컨볼루션 신경망
        x += residual # 원래꺼를 더한다. 왜 이걸 하는지는 모르겠다
        return x # (batch, channel, feature, time)

class BidirectionalGRU(nn.Module):
    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        # self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x) # normalize
        x = F.relu(x) # 활성화함수
        x, _ = self.BiGRU(x) # BiGRU
        # x = self.dropout(x) # dropout
        return x

class SpeechRecognitionModel(nn.Module):
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features, 1개에서 32개의 사진을 만들고, 3x3 크기의 커널로 stride만큼 돌린다(단 돌리기 전에 padding을 씌운다)

        # n residual cnn layers with filter size of 32
        # ResidualCNN을 n_cnn_layers만큼 여러개를 만든다.
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats)
            for _ in range(n_cnn_layers)
        ])

        self.fully_connected = nn.Linear(n_feats*32, rnn_dim) # Applies a linear transformation to input data, 입력 데이터의 차원은 n_feats*32, 출력 데이터의 차원은 rnn_dim

        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim), # birnn returns rnn_dim*2, 차원을 1/2배로 줄여준다.
            nn.ReLU(), # 활성화 함수
            # nn.Dropout(dropout), # dropout
            nn.Linear(rnn_dim, n_class) # Linear transformation to n_class
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time), (배치 사이즈, 채널 크기 x feature 사이즈, 음성 길이)
        x = x.transpose(1, 2) # (batch, time, feature), (배치 사이즈, 음성 길이, 채널 크기 x feature 사이즈)
        x = self.fully_connected(x) # (배치 사이즈, 음성 길이, rnn_dim)
        x = self.birnn_layers(x) # bidirectional GRU
        x = self.classifier(x) # 최종
        return x

# 학습과 validation을 위한 함수 생성

1. train 함수 : 1 epoch을 학습시키는 함수

2. test 함수 : validation 하는 함수

In [24]:
def train(model, device, train_loader, criterion, optimizer, scheduler, epoch):
    model.train()
    data_len = len(train_loader.dataset)
    # with experiment.train():
    for batch_idx, _data in enumerate(train_loader):
        spectrograms, labels, input_lengths, label_lengths = _data
        spectrograms, labels = spectrograms.to(device), labels.to(device)

        optimizer.zero_grad()

        output = model(spectrograms)  # (batch, time, n_class)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1) # (time, batch, n_class)

        loss = criterion(output, labels, input_lengths, label_lengths)
        loss.backward()

        optimizer.step()
        scheduler.step()
        # iter_meter.step()
        if batch_idx % 100 == 0 or batch_idx == data_len:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(spectrograms), data_len,
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, device, test_loader, criterion, epoch):
    print('\nevaluating...')
    model.eval()
    test_loss = 0
    test_wer = []
    test_cer = []
    # with experiment.test():
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)

            decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)
            for j in range(len(decoded_preds)):
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
    
    avg_wer = sum(test_wer)/len(test_wer)
    avg_cer = sum(test_cer)/len(test_cer)
    # experiment.log_metric('test_loss', test_loss, step=iter_meter.get())
    # experiment.log_metric('wer', avg_wer, step=iter_meter.get())
    print('Test set: Average loss: {:.4f}, Average WER: {:.4f}, Average CER: {:.4f}\n'.format(test_loss, avg_wer, avg_cer))
    return test_loss, avg_wer, avg_cer

# 위의 데이터와 train,test 함수를 활용한 main 함수 제작

In [25]:
def main(learning_rate=5e-4, batch_size=20, epochs=10, train_url="train-clean-100", test_url="test-clean"):
    ########################################## 파라미터 정하기
    hparams = {
        "n_cnn_layers": 1,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 44,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.0,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "epochs": epochs
    }
    os.makedirs( '/gdrive/MyDrive/speech_recognition/', exist_ok=True)
    # experiment.log_parameters(hparams)

    use_cuda = torch.cuda.is_available()
    torch.manual_seed(7)
    device = torch.device("cuda" if use_cuda else "cpu")

    ########################################## 데이터 저장하기
    if not os.path.isdir("./data"):
        os.makedirs("./data")

    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=True,
                                collate_fn=train_processing, #lambda x: data_processing(x, 'train'),
                                **kwargs)
    test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=False,
                                collate_fn=val_processing, #lambda x: data_processing(x, 'valid'),
                                **kwargs)

    ########################################## 모델 저장하기
    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)

    # model.load_state_dict(torch.load('/gdrive/My Drive/Colab Notebooks/new_data_without_residual_22.pt'))
    # model.load_state_dict(torch.load('/gdrive/My Drive/Colab Notebooks/new_data_without_residual_15.pt'))

    # print(model)
    print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))


    ########################################## 학습을 위한 다양한 함수 정의(optimizer, criterion 등)
    optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])
    criterion = nn.CTCLoss(blank=43).to(device)
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=hparams['learning_rate'], 
                                              steps_per_epoch=int(len(train_loader)),
                                              epochs=hparams['epochs'],
                                              anneal_strategy='linear')

    # iter_meter = IterMeter()

    ########################################## 정한 에폭동안 학습
    for epoch in range(1, epochs + 1):
        train(model, device, train_loader, criterion, optimizer, scheduler, epoch)
        test(model, device, test_loader, criterion, epoch)
        torch.save(model.state_dict(), '/gdrive/MyDrive/speech_recognition/'+str(epoch)+'.pt') #저장 경로 확인

# 학습 시작!

에폭을 5까지밖에 안 하였지만, 더 해도 될 것으로 보임.

In [None]:
learning_rate = 5e-4
batch_size = 5
epochs = 10

libri_train_set = "train-clean-100"
libri_test_set = "test-clean"

main(learning_rate, batch_size, epochs, libri_train_set, libri_test_set)

Num Model Parameters 23675692


# 음성기호 예측

In [None]:
# test.csv파일에서 테스트 파일 목록 불러오기
test_file = pd.read_csv("/gdrive/MyDrive/PREDICTION/호두랩스 대회/datasets/test.csv")
test_list = test_file["file_name"]

#테스트 파일 목록에 있는 음성 데이터 파일 불러오기
base_dir = "/gdrive/MyDrive/PREDICTION/호두랩스 대회/datasets/speech_folder" #테스트 데이터가 저장된 경로

test_dataset = []
for file_name in test_list: 
    file_dir = os.path.join(base_dir, file_name)
    waveform, sample_rate = torchaudio.load(file_dir) #각 음성데이터의 waverform, sample_rate 저장
    test_dataset.append((waveform, sample_rate))

OSError: ignored

In [None]:
# test data에는 label이 없으므로, 음성 데이터(waveform)만을 다룬 전처리 과정

def test_processing(data): 
    spectrograms = [] # 음성 데이터를 append하는 리스트
    input_lengths = [] # 음성 데이터의 길이를 2로 나눈 값을 append하는 리스트
    
    for (waveform, _) in data:
        spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1) # 차원을 하나 없애고 역을 취한다(원래는 시간축이 뒤에 있었는데,,, 역을 취하여 앞에 가도록 바꾼다).
        spectrograms.append(spec)

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True) # 소리의 길이가 다르므로 하나의 Tensor로 만들기 위하여 padding을 진행하고 Tensor로 만든다.
    spectrograms = spectrograms.unsqueeze(1) # 차원을 하나 더 붙힌다.
    spectrograms = spectrograms.transpose(2, 3) # transpose를 한다. (배치 크기, 1, feature크기, 음성길이)로 만들었다.

    return spectrograms, input_lengths

In [None]:
#test를 위한 loader 

test_loader = data.DataLoader(test_dataset,
                            batch_size=1,
                            shuffle=False,
                            collate_fn=test_processing) 

In [None]:
# train에서와 동일한 하이퍼 파라미터 설정
hparams = {
    "n_cnn_layers": 1,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 44,
        "n_feats": 128}

# train단계에서 저장한 모델  불러오기
model = SpeechRecognitionModel(hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],hparams['n_class'], hparams['n_feats'])
model.load_state_dict(torch.load('/gdrive/MyDrive/PREDICTION/호두랩스 대회/new_pron5.pt'))
model.eval()

SpeechRecognitionModel(
  (cnn): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (rescnn_layers): Sequential(
    (0): ResidualCNN(
      (cnn1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (cnn2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (dropout1): Dropout(p=0.1, inplace=False)
      (dropout2): Dropout(p=0.1, inplace=False)
      (layer_norm1): CNNLayerNorm(
        (layer_norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      )
      (layer_norm2): CNNLayerNorm(
        (layer_norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      )
      (batch_norm1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (batch_norm2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
  )
  (fully_connected): Linear(in_features=2048, out_features=512, bias=True)
  (birnn_layers): Sequential(
    (0): BidirectionalGRU(
      (BiGRU)

In [None]:
# # test data에는 label이 없으므로, 음성 데이터(waveform)만을 다룬 디코더 함수 생성

def GreedyDecoder_for_test(output, blank_label=43, collapse_repeated=True):
	arg_maxes = torch.argmax(output, dim=2)
	decodes = []
	for i, args in enumerate(arg_maxes):
		decode = []
		for j, index in enumerate(args):
			if index != blank_label:
				if collapse_repeated and j != 0 and index == args[j -1]:
					continue
				decode.append(index.item())
		decodes.append(text_transform.int_to_text(decode))
	return decodes

In [None]:
#predict

predicted = []

with torch.no_grad():
    for i, _data in enumerate(test_loader):
      spectrograms, input_lengths = _data 
      output = model(spectrograms)  # (batch, time, n_class)
      output = F.log_softmax(output, dim=2)
      output = output.transpose(0, 1) # (time, batch, n_class)
      decoded_preds = GreedyDecoder_for_test(output.transpose(0, 1))

      predicted.append(decoded_preds)

In [None]:
# 제출 파일

df_prediction = pd.DataFrame({'id': test_list, 'text':predicted})

display(df_prediction)

Unnamed: 0,id,text
0,24750_20200129134606_0032.wav,[ænd ɪ pæn]
1,25854_20191023211911_0053.wav,[hir]
2,24750_20200204151411_0018.wav,[gər sæns]
3,30423_20200115214816_0010.wav,[mɛ lɛt if]
4,30423_20200115214944_0024.wav,[eɪn miŋ meɪdzeɪ]
...,...,...
407,1320-122612-0008.flac,[ðə aɪz əv ðə hoʊl pɑrti fɑloʊd ðə ænɪkspɛktɪd...
408,1320-122612-0009.flac,[hɪt wʊd hæv bɪn mɔr wəndərfəl hædi spoʊkən wɪ...
409,1320-122612-0013.flac,[ə sərpəl əv ə fju hənərd fitɪn səkəm frɛns wɪ...
410,1320-122612-0014.flac,[ðə ɪgzæməneɪʃən haʊɛvər rɪzəltɪd ɪn noʊ dɪskɛ...
