<a href="https://colab.research.google.com/github/syoung7388/2023_MLCL_Bootcamp/blob/main/Deep%20Speech2-based%20E2E%20ASR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Speech2 모델 기반 End-to-End Speech Recognition 코드

> 2022.02.15 / 경북대학교 인공지능학과 김준우 박사과정


> Q&A: kaen2891@gmail.com 


참고자료1 - https://arxiv.org/abs/1512.02595<br>

참고자료2 - https://www.assemblyai.com/blog/end-to-end-speech-recognition-pytorch

DeepSpeech2는 Baidu에서 개발한 End-to-End STT 모델임

해당 모델을 학습시켜 예측과 제출까지 해보는 베이스라인 코드를 초보자도 쉽게 이해할 수 있도록 작성

# 필요 패키지 install 및 import

In [None]:
# 패키지 인스톨 (최신버전 pytorch, torchaudio)
!pip install torchaudio==0.9.1 torch==1.9.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchaudio==0.9.1
  Downloading torchaudio-0.9.1-cp38-cp38-manylinux1_x86_64.whl (1.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting torch==1.9.1
  Downloading torch-1.9.1-cp38-cp38-manylinux1_x86_64.whl (831.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m831.4/831.4 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torch, torchaudio
  Attempting uninstall: torch
    Found existing installation: torch 1.13.1+cu116
    Uninstalling torch-1.13.1+cu116:
      Successfully uninstalled torch-1.13.1+cu116
  Attempting uninstall: torchaudio
    Found existing installation: torchaudio 0.13.1+cu116
    Uninstalling torchaudio-0.13.1+cu116:
      Successfully uninstalled torchaudio-0.13.1+cu116
[31mERROR: pip's dependency resolver does not current

In [None]:
# 패키지 임포트
import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np
import pandas as pd

In [None]:
torch.__version__ # 버전 확인

'1.9.1+cu102'

# 폴더 설정 및 생성

In [None]:
# 현재 디렉토리에 "data"라는 폴더 생성
if not os.path.isdir("./data"):
  os.makedirs("./data")

In [None]:
# 본인의 구글 드라이브와 연결함 ---> 모델 웨이트 저장하기 위해서
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


# Evaluation metric 설정

음성인식 성능 평가를 위한 여러 함수 정의
1. _levenshtein_distance = 두 문자열의 차이를 측정하기 위한 메트릭

2. cer = character error rate(stt 모델에서 흔히 사용하는 메트릭임 - http://blog.atlaslabs.ai/all/tech/105/)

3. wer = word error rate(stt 모델에서 흔히 사용하는 메트릭임 - https://docs.microsoft.com/ko-kr/azure/cognitive-services/speech-service/how-to-custom-speech-evaluate-data)


In [None]:
def _levenshtein_distance(ref, hyp):
    """
    Levenshtein 거리는 두 문자열의 차이를 측정하기 위한 메트릭
    구체적으로는 두 문자열 s1,s2이 있다고 했을 때, 
    s1을 s2로 변경하는 데 필요한 단어의 삭제, 삽입, 대체 등의 작업의 총 갯수임.
    """
    m = len(ref)
    n = len(hyp)

    if ref == hyp:
        return 0
    if m == 0:
        return n
    if n == 0:
        return m

    if m < n:
        ref, hyp = hyp, ref
        m, n = n, m

    distance = np.zeros((2, n + 1), dtype=np.int32)

    for j in range(0,n + 1):
        distance[0][j] = j

    for i in range(1, m + 1):
        prev_row_idx = (i - 1) % 2
        cur_row_idx = i % 2
        distance[cur_row_idx][0] = i
        for j in range(1, n + 1):
            if ref[i - 1] == hyp[j - 1]:
                distance[cur_row_idx][j] = distance[prev_row_idx][j - 1]
            else:
                s_num = distance[prev_row_idx][j - 1] + 1
                i_num = distance[cur_row_idx][j - 1] + 1
                d_num = distance[prev_row_idx][j] + 1
                distance[cur_row_idx][j] = min(s_num, i_num, d_num)

    return distance[m % 2][n]

def cer(reference, hypothesis):
    """Character Error Rate을 계산하는 코드"""

    reference = reference.lower()
    hypothesis = hypothesis.lower()

    edit_distance = _levenshtein_distance(reference, hypothesis)
    ref_len = len(reference)

    if ref_len == 0:
        raise ValueError("Length of reference should be greater than 0.")

    cer = float(edit_distance) / ref_len
    return cer

def wer(reference, hypothesis, delimiter=' '):
    """Word Error Rate을 계산하는 코드"""

    reference = reference.lower()
    hypothesis = hypothesis.lower()

    ref_words = reference.split(delimiter)
    hyp_words = hypothesis.split(delimiter)

    edit_distance = _levenshtein_distance(ref_words, hyp_words)

    edit_distance = float(edit_distance)
    ref_len = len(ref_words)

    if ref_len == 0:
        raise ValueError("Reference's word number should be greater than 0.")

    wer = float(edit_distance) / ref_len
    return wer

# 캐릭터를 숫자로 매핑하기 위한 Class 지정

In [None]:
class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        a 2
        b 3
        c 4
        d 5
        e 6
        f 7
        g 8
        h 9
        i 10
        j 11
        k 12
        l 13
        m 14
        n 15
        o 16
        p 17
        q 18
        r 19
        s 20
        t 21
        u 22
        v 23
        w 24
        x 25
        y 26
        z 27
        """
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch
        self.index_map[1] = ' '

    def text_to_int(self, text):
        """ 텍스트를 숫자로 매핑하는 함수 """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ 숫자를 텍스트로 매핑하는 함수 """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string).replace('<SPACE>', ' ')

In [None]:
text_transform = TextTransform()

# SpecAugment를 위한 변환 함수 정의

SpecAugment란 음성 데이터의 data augmentation 기법의 일종 시간축과 주파수 축을 무작위로 Masking 함

놀랍게도 이런 단순한 방식으로도 stt 모델의 오버피팅을 막을 수 있음

https://arxiv.org/abs/1904.08779

In [None]:
train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=80), # Mel-Spectogram
    torchaudio.transforms.FrequencyMasking(freq_mask_param=15), #frequency 축을 masking 함(specaugment의 일환)
    torchaudio.transforms.TimeMasking(time_mask_param=35) #time 축을 masking 함(specaugment의 일환)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=80) # Mel-Spectogram

# train 데이터에는 specaugment를 적용하고 valid와 test셋에는 적용하지 않음

In [None]:
print(train_audio_transforms)

Sequential(
  (0): MelSpectrogram(
    (spectrogram): Spectrogram()
    (mel_scale): MelScale()
  )
  (1): FrequencyMasking()
  (2): TimeMasking()
)


# 데이터 변환을 위한 _processing 함수 정의

우리는 본 튜토리얼에서 librispeech 데이터셋을 활용



In [None]:
def train_processing(data):
    spectrograms = [] # 음성 데이터를 append하는 리스트
    labels = [] # 텍스트를 숫자로 바꾼 결과물을 append하는 리스트
    input_lengths = [] # 음성 데이터의 길이를 2로 나눈 값을 append하는 리스트 --> CNN output 길이와 맞춰주기 위함
    label_lengths = [] # 텍스트의 길이를 append하는 리스트
    for (waveform, _, utterance, _, _, _) in data:
        spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1) # 차원을 하나 없애고 역을 취함
        spectrograms.append(spec)
        
        label = torch.Tensor(text_transform.text_to_int(utterance.lower())) # 텍스트를 숫자로 변환
        labels.append(label)
        input_lengths.append(spec.shape[0]//2) # 음성 데이터 길이
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True) # 소리의 길이가 다르므로 batch 단위로 같은 길이로 만들기 위하여 padding 진행
    spectrograms = spectrograms.unsqueeze(1) # 차원을 하나 더 붙힌다. --> grey scale의 CNN channel을 연산하기 위함
    spectrograms = spectrograms.transpose(2, 3) # transpose를 한다. (배치 크기, 1, feature크기, 음성길이)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True) # 라벨도 마찬가지로 길이가 다르므로 batch 단위로 같은 길이로 만들기 위하여 padding 진행

    return spectrograms, labels, input_lengths, label_lengths

def val_processing(data): # train_processing과 동일, augmentation만 진행하지 않음
    spectrograms = [] 
    labels = [] 
    input_lengths = [] 
    label_lengths = [] 
    for (waveform, _, utterance, _, _, _) in data:
        spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        spectrograms.append(spec)

        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True)
    spectrograms = spectrograms.unsqueeze(1)
    spectrograms = spectrograms.transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

    return spectrograms, labels, input_lengths, label_lengths

# 실제 데이터 Load(Pytorch의 Dataset과 DataLoader 클래스를 이용)

이번 베이스라인 코드에서는 librispeech의 데이터셋을 학습시킬 것이며,

librispeech 데이터 중 활용할 수 있는 데이터는 아래와 같음

1. train-clean-100
2. train-clean-360
3. train-other-500

이번 튜토리얼에서는 colab의 용량상 train-clean-100의 80%를 학습에 사용하고, 나머지 20%를 validation에 사용함

In [None]:
dataset = torchaudio.datasets.LIBRISPEECH("./data", url='train-clean-100', download=True)

  0%|          | 0.00/5.95G [00:00<?, ?B/s]

만약 dataset에 "train-clean-100"뿐만 아니라 "train-clean-360", "train-other-500"까지 활용하고 싶다면 아래와 같이 코드를 짜면 됨

In [None]:
'''
dataset = data.ConcatDataset(
             [
                 torchaudio.datasets.LIBRISPEECH("./data", url=path, download=True)
                 for path in ["train-clean-100","train-clean-360","train-other-500"]
             ]
         )
'''

'\ndataset = data.ConcatDataset(\n             [\n                 torchaudio.datasets.LIBRISPEECH("./data", url=path, download=True)\n                 for path in ["train-clean-100","train-clean-360","train-other-500"]\n             ]\n         )\n'

In [None]:
# dataset 중에서 80,20으로 split하여 train_dataset과 test_dataset 생성
lengths = [int(dataset.__len__()*0.8), dataset.__len__()-int(dataset.__len__()*0.8)]

import torch.utils.data as data
train_dataset, test_dataset = data.random_split(dataset, lengths)
print('len of train {} test {}'.format(len(train_dataset), len(test_dataset)))

len of train 22831 test 5708


# Decoding하기 - 단순 GreedyDecoder를 활용

In [None]:
# 예측 결과를 통해 최종 텍스트 추출
def GreedyDecoder(output, labels, label_lengths, blank_label=28, collapse_repeated=True):
	arg_maxes = torch.argmax(output, dim=2)
	decodes = []
	targets = []
	for i, args in enumerate(arg_maxes):
		decode = []
		targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
		for j, index in enumerate(args):
			if index != blank_label:
				if collapse_repeated and j != 0 and index == args[j -1]:
					continue
				decode.append(index.item())
		decodes.append(text_transform.int_to_text(decode))
	return decodes, targets

# Model Architecture(DeepSpeech2 기반)

1. CNNLayerNorm : CNNLayer의 인풋을 Normalizing하기 위한 클래스<br>

2. ResidualCNN : ResCNN을 위한 클래스 (Residual 커넥션을 활용하면 DEEP한 네트워크에서도 CNN에 대한 높은 정확도와 빠른 학습 기대)<br>

3. BidirectionalGRU : 양방햔 GRU(RNN의 일종)을 위한 클래스<br>

4. SpeechRecognitionModel : 위의 요소들을 종합한 최종 Speech Recognition Model 클래스

In [None]:
class CNNLayerNorm(nn.Module):
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 

class ResidualCNN(nn.Module): # 컨볼루션->normalize->활성화->dropout의 연속
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()
        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

        self.batch_norm1 = nn.BatchNorm2d(in_channels)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time) (배치 크기, 채널 수, feature, 음성의 길이)
        x = self.layer_norm1(x) # layer Normalization
        x = F.relu(x) # 활성화 함수
        x = self.cnn1(x) # 컨볼루션 신경망
        x = self.layer_norm2(x) # layer Normalization
        x = F.relu(x) # 활성화 함수
        x = self.cnn2(x) # 컨볼루션 신경망
        x += residual # 원래꺼를 더함 (residual)
        return x # (batch, channel, feature, time)

class BidirectionalGRU(nn.Module):
    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)

    def forward(self, x):
        x = self.layer_norm(x) # layer Normalization
        x = F.relu(x) # 활성화함수
        x, _ = self.BiGRU(x) # BiGRU
        return x

class SpeechRecognitionModel(nn.Module):
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)

        # ResidualCNN을 n_cnn_layers만큼 여러개를 만든다. (3개)
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats)
            for _ in range(n_cnn_layers)
        ])

        self.fully_connected = nn.Linear(n_feats*32, rnn_dim) # 입력 데이터의 차원은 n_feats*32(채널 갯수), 출력 데이터의 차원은 rnn_dim(512)

        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim), # birnn returns rnn_dim*2(1024), 차원을 1/2배로 줄여준다.(512)
            nn.ReLU(), # 활성화 함수
            nn.Linear(rnn_dim, n_class) # Linear transformation to n_class
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time), (배치 사이즈, 채널 크기 x feature 사이즈, 음성 길이)
        x = x.transpose(1, 2) # (batch, time, feature), (배치 사이즈, 음성 길이, 채널 크기 x feature 사이즈)
        x = self.fully_connected(x) # (배치 사이즈, 음성 길이, rnn_dim)
        x = self.birnn_layers(x) # bidirectional GRU
        x = self.classifier(x) # 최종
        return x

# 학습과 validation을 위한 함수 생성

1. train 함수 : 1 epoch을 학습시키는 함수

2. test 함수 : validation 하는 함수

In [None]:
def train(model, device, train_loader, criterion, optimizer, scheduler, epoch):
    model.train()
    data_len = len(train_loader.dataset)
    # with experiment.train():
    for batch_idx, _data in enumerate(train_loader):
        spectrograms, labels, input_lengths, label_lengths = _data
        spectrograms, labels = spectrograms.to(device), labels.to(device)

        optimizer.zero_grad()

        output = model(spectrograms)  # (batch, time, n_class)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1) # (time, batch, n_class)

        loss = criterion(output, labels, input_lengths, label_lengths)
        loss.backward()

        optimizer.step()
        scheduler.step()
        # iter_meter.step()
        if batch_idx % 100 == 0 or batch_idx == data_len:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(spectrograms), data_len,
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, device, test_loader, criterion, epoch):
    print('\nevaluating...')
    model.eval()
    test_loss = 0
    test_wer = []
    test_cer = []
    # with experiment.test():
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)

            decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)
            for j in range(len(decoded_preds)):
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
    avg_wer = sum(test_wer)/len(test_wer)
    avg_cer = sum(test_cer)/len(test_cer)
    return test_loss, avg_wer, avg_cer

# 위의 데이터와 train,test 함수를 활용한 main 함수 제작

In [None]:
def main(learning_rate=5e-4, batch_size=20, epochs=10):
    ########################################## 파라미터 정하기
    hparams = {
        "n_cnn_layers": 1,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 80,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "epochs": epochs
    }

    # experiment.log_parameters(hparams)

    use_cuda = torch.cuda.is_available()
    torch.manual_seed(7)
    device = torch.device("cuda" if use_cuda else "cpu")

    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=True,
                                collate_fn=train_processing, 
                                **kwargs)
    test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=False,
                                collate_fn=val_processing, 
                                **kwargs)

    ########################################## 모델 저장하기
    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)

    print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))


    ########################################## 학습을 위한 다양한 함수 정의(optimizer, criterion 등)
    optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])
    criterion = nn.CTCLoss(blank=28).to(device)
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=hparams['learning_rate'], 
                                              steps_per_epoch=int(len(train_loader)),
                                              epochs=hparams['epochs'],
                                              anneal_strategy='linear')

    ########################################## 정한 에폭동안 학습
    
    for epoch in range(1, epochs + 1):
        train(model, device, train_loader, criterion, optimizer, scheduler, epoch)
        torch.save(model.state_dict(), '/gdrive/My Drive/new_pron'+str(epoch)+'.pt')
        test_loss, avg_wer, avg_cer = test(model, device, test_loader, criterion, epoch)
        print('Epoch: {}, Test set Average loss: {:.4f}, Average WER: {:.4f}, Average CER: {:.4f}\n'.format(epoch, test_loss, avg_wer, avg_cer))
    

# 학습 시작!

2에폭 까지 학습 

값 변경해보면서 낮은 WER, CER 뽑기

In [None]:
learning_rate = 5e-4
batch_size = 32
epochs = 2

main(learning_rate, batch_size, epochs)

Num Model Parameters 23274685


Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 245, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe


KeyboardInterrupt: ignored

# 평가용 코드 (배치 단위)

20에폭 학습해놓은 모델 웨이트 불러와서 평가

In [None]:
def for_eval(batch_size):
    ########################################## 파라미터 정하기
    hparams = {
        "n_cnn_layers": 1,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 80,
        "stride":2,
        "dropout": 0.1,
        "batch_size": batch_size,
    }

    use_cuda = torch.cuda.is_available()
    print('cuda', use_cuda)
    device = torch.device("cuda" if use_cuda else "cpu")
    print('device', device)

    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}    
    test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=False,
                                collate_fn=val_processing,
                                **kwargs)

    ########################################## 모델 불러오기
    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)
    
    model.load_state_dict(torch.load('/content/new_pron20_ver2.pt', map_location="cuda:0")) #다운로드 받은 모델 웨이트 불러오기

    print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))

    criterion = nn.CTCLoss(blank=28).to(device)    
    for_test(model, device, test_loader, criterion)

In [None]:
def for_test(model, device, test_loader, criterion):
    print('\nevaluating...')
    model.eval()
    test_loss = 0
    test_wer = []
    test_cer = []
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)

            decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)
            for j in range(len(decoded_preds)):
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                print('실제',decoded_targets[j])
                print('예측',decoded_preds[j])
    avg_wer = sum(test_wer)/len(test_wer)
    avg_cer = sum(test_cer)/len(test_cer)
    print('Test set: Average loss: {:.4f}, Average WER: {:.4f}, Average CER: {:.4f}\n'.format(test_loss, avg_wer, avg_cer))
    return test_loss, avg_wer, avg_cer

20 에폭까지 학습된 모델로 평가 진행 (약 20분 소요)

In [None]:
batch_size = 32

for_eval(batch_size)

cuda True
device cuda
Num Model Parameters 23274685

evaluating...
실제 was insufficient to cope with such a throng he allowed louise to impress several farmers daughters into service and was able to feed everyone without delay and in an abundant and satisfactory manner
예측 was insufficient to cope with such a throng he aloued louised to imprsst several farmer's daughters in to service and was able to feed everywom without tealay and in in abunde and satisfactory manner
실제 and the afternoon they so amazed the apple woman of ancient lineage by stopping before her stall and telling her she was to have a tent and a stove and a shawl and a sum of money which seemed to her quite wonderful
예측 and the after knoomn they so amasze the ap woman of ancant laniage by stopping befor hersfall and tilline her she was thay have attent and a sto and shal and a some of money whilh seemed to her quite wonderful
실제 when the point of his knife had been withdrawn would he be able to insert the point again betw

KeyboardInterrupt: ignored

#데이터 1개씩 들어보고 인식해보기

In [None]:
# 정답이 주어지지 않는 테스트셋 전용 디코더
def GreedyDecoderTest(output, blank_label=28, collapse_repeated=True):
	arg_maxes = torch.argmax(output, dim=2)
	decodes = []
	for i, args in enumerate(arg_maxes):
		decode = []
		
		for j, index in enumerate(args):
			if index != blank_label:
				if collapse_repeated and j != 0 and index == args[j -1]:
					continue
				decode.append(index.item())
		decodes.append(text_transform.int_to_text(decode))
	return decodes

In [None]:
hparams = {
        "n_cnn_layers": 1,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 80,
        "stride":2,
        "dropout": 0.1,
        "batch_size": batch_size,
}

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

########################################## 모델
model = SpeechRecognitionModel(
    hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
    hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
    ).to(device)

model.load_state_dict(torch.load('/content/new_pron20_ver2.pt', map_location="cuda:0"))

def val_processing_each(data):
    waveform, sr = torchaudio.load(data)
    spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1) # 차원을 하나 없애고 역을 취한다(원래는 시간축이 뒤에 있었는데,,, 역을 취하여 앞에 가도록 바꾼다).
    spec = spec.unsqueeze(0)
    spec = spec.unsqueeze(1).transpose(2, 3)
    #print(spec.size())

    return spec

#파일 1개씩 테스트 해보기

In [None]:
from IPython.display import Audio, display
sample_data = './data/LibriSpeech/train-clean-100/911/128684/911-128684-0000.flac'
display(Audio(sample_data, autoplay=True))


In [None]:
model.eval()
    
with torch.no_grad():
    spectrogram = val_processing_each(sample_data)
    spectrogram = spectrogram.to(device)

    output = model(spectrogram)  # (batch, time, n_class)
    output = F.log_softmax(output, dim=2)
    output = output.transpose(0, 1) # (time, batch, n_class)
    

    decoded_preds = GreedyDecoderTest(output.transpose(0, 1))
    print(decoded_preds)

['are as duelistic as it is possible to be thoughts we all naturally think ar made of one kind of substance and things of another consciousness flowing inside of us in the forms of conception or jegment']


In [None]:
from IPython.display import Audio, display
sample_data = './data/LibriSpeech/train-clean-100/8629/261139/8629-261139-0009.flac'
display(Audio(sample_data, autoplay=True))


In [None]:
model.eval()
    
with torch.no_grad():
    spectrogram = val_processing_each(sample_data)
    spectrogram = spectrogram.to(device)

    output = model(spectrogram)  # (batch, time, n_class)
    output = F.log_softmax(output, dim=2)
    output = output.transpose(0, 1) # (time, batch, n_class)
    

    decoded_preds = GreedyDecoderTest(output.transpose(0, 1))
    print(decoded_preds)

['and to him af full attention was bei given in the hope that some real andlightenment would come at last the settle thac quetions which had been raised by amabozand complete and un satisfactre testimony but no man cane firnish ohat he does not possess']
