In [None]:
import torch
import torchaudio
import pandas as pd
import torch.nn as nn

In [None]:
# Speech Commands Dataset Download

# !wget http://download.tensorflow.org/data/speech_commands_v0.01.tar.gz
# ! mkdir speech_commands
# ! tar -zxvf speech_commands_v0.01.tar.gz -C ./speech_commands

# Load Audio File

## Data : Google Speech Command

30개 음성 명령어 데이터 
https://huggingface.co/datasets/google/speech_commands

### Current Status

- version 0.01 : 64,727 recordings ("Yes", "No", "Up", "Down", "Left", "Right", "On", "Off", "Stop", "Go", "Zero", "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", "Bed", "Bird", "Cat", "Dog", "Happy", "House", "Marvin", "Sheila", "Tree", "Wow".")
- version 0.02 : 105,829 recordings (version 0.01에 "Backward", "Forward", "Follow", "Learn", "Visual" 추가)

### Supoort Task
- Keyword Spotting

- `datasets` : audio, computer vision, nlp task 용 공유 데이터에 쉽게 접근할 수 있는 라이브러리, huggingface에서 사용됨

In [None]:
! pip install datasets

In [None]:
from datasets import load_dataset

train_dataset = load_dataset("google/speech_commands", "v0.01", split="train")
val_dataset = load_dataset("google/speech_commands", "v0.01", split="validation")
test_dataset = load_dataset("google/speech_commands", "v0.01", split="test")

In [None]:
sample = train_dataset[0]
print(sample)
print("audio")
print(sample['audio'])
print("array")
print(sample['audio']['array'])
print("label")
print(sample['label'])

In [None]:
import IPython.display as ipd

sample = train_dataset[0]
ipd.Audio(sample['audio']['array'], rate=16000) 

In [None]:
from transformers import Wav2Vec2FeatureExtractor, Wav2Vec2Model

feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("facebook/wav2vec2-base")
model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")

In [None]:
sample = train_dataset[0]
print(len(sample['audio']['array']))
input = feature_extractor(sample['audio']['array'], sampling_rate=feature_extractor.sampling_rate)
input

In [None]:
print(feature_extractor)
print(model)

In [None]:
input_values = torch.tensor(input['input_values'])
print(input_values.shape)

In [None]:
out = model(input_values)
print(out)

- `last_hidden_state`
- `extract_features`
- `hidden_states` : model(input_values, output_hidden_states=True)

In [None]:
last_hidden_state = out['last_hidden_state']
extract_features = out['extract_features']

In [None]:
from torch.utils.data import Dataset,DataLoader
from tqdm import tqdm

In [None]:
n_classes = 3

train_dataset = train_dataset.filter(lambda x : x['label'] < n_classes)
val_dataset = val_dataset.filter(lambda x : x['label'] < n_classes)
test_dataset = test_dataset.filter(lambda x : x['label'] < n_classes)

print(len(train_dataset), len(val_dataset), len(test_dataset))

In [None]:
def ds_to_df(dataset):
    df = pd.DataFrame()
    
    df['file'] = dataset['file']
    df['array'] = [x['array'] for x in dataset['audio']]
    df['label'] = dataset['label']
    df['is_unknown'] = dataset['is_unknown']
    
    return df

In [None]:
train_df = ds_to_df(train_dataset)
val_df = ds_to_df(val_dataset)
test_df = ds_to_df(test_dataset)

In [None]:
# train_df = train_df[train_df['label'] < n_classes].reset_index(drop=True)
# val_df = val_df[val_df['label'] < n_classes].reset_index(drop=True)
# test_df = test_df[test_df['label'] < n_classes].reset_index(drop=True)

# print(len(train_df), len(val_df), len(test_df))

In [None]:
print(test_df.loc[0])
print("array")
print(test_df.loc[0, 'array'])

## PyTorch Daset Object 개념 
`torch.utils.data.Dataset`의 subclass는 `__len__`과 `__getitem__`을 구현하기 위해 필요한 객체이다
- `__len__` : 데이터셋의 아이템 수를 반환
- `__getitem__`: 샘플과 레이블을 반환


[그림]데이터를 직접적으로 가지고 있지 않지만 `__len__` 과 `__getitem__`을 통해 접근가능

<img src="https://drek4537l1klr.cloudfront.net/stevens2/Figures/CH07_F02_Stevens2_GS.png" width=600>

In [None]:
class AudioDataSet(Dataset) : 
  def __init__(self,df):
    self.df = df
    self.sr = 16000
    self.max_length = self.get_max_length()
    self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("facebook/wav2vec2-base")
    
  
  def get_max_length(self): 
    max_length = 0
    for i, row in tqdm(self.df.iterrows(), total = len(self.df)):
      if row['label'] == 30:
        continue
      
      array = row['array']
      max_length = max(max_length, len(array))
    
    return max_length

  def __len__(self):    
    return len(self.df)

  def __getitem__(self,idx):
    # audio_path = self.dataset[idx]['file']
    # y,sr = torchaudio.load(audio_path)
    # padded  = torch.zeros(20000)
    # padded[:len(y[0])] = y[0]
    
    audio = self.df.loc[idx, 'array']
    
    audio = self.feature_extractor(
        audio, sampling_rate=self.feature_extractor.sampling_rate, max_length=self.max_length, truncation=True, padding='max_length', return_tensors="pt"
    )
    audio_values = audio['input_values'][0]
    
    label = self.df.loc[idx, "label"]
    
    return {'input_values':audio_values, 'label':label}

In [None]:
train = AudioDataSet(train_df)
val = AudioDataSet(val_df)
test = AudioDataSet(test_df)

inputs = test[0]
# inputs['label']

In [None]:
train_loader = DataLoader(train, shuffle=True, batch_size=8)
out = next(iter(train_loader))
audio = out['input_values']
label = out['label']
print(audio)
print(audio.shape)
print(label)

# Model 정의

In [None]:
# create id2label dictionary

labels = train_dataset.features["label"].names
label2id, id2label = dict(), dict()
for i, label in enumerate(labels[:n_classes]):
    label2id[label] = i
    id2label[i] = label

id2label


# Train

In [None]:
! pip install evaluate
! pip install accelerate -U

In [None]:
from transformers import Trainer

??Trainer

In [None]:
from transformers import TrainingArguments

??TrainingArguments

In [None]:
import numpy as np
import evaluate

accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    """Computes accuracy on a batch of predictions"""
    
    predictions = np.argmax(eval_pred.predictions, axis=-1)
    return accuracy.compute(predictions=predictions, references=eval_pred.label_ids)

In [None]:
from transformers import Trainer
from collections.abc import Mapping

def nested_detach(tensors):
    "Detach `tensors` (even if it's a nested list/tuple/dict of tensors)."
    if isinstance(tensors, (list, tuple)):
        return type(tensors)(nested_detach(t) for t in tensors)
    elif isinstance(tensors, Mapping):
        return type(tensors)({k: nested_detach(t) for k, t in tensors.items()})
    return tensors.detach()

class CustomTrainer(Trainer):
    
    def prediction_step(
        self, model, inputs, prediction_loss_only,ignore_keys ) :
        labels = inputs['labels']
        # _, labels = inputs
        
        if ignore_keys is None:
            if hasattr(self.model, "config"):
                ignore_keys = getattr(self.model.config, "keys_to_ignore_at_inference", [])
            else:
                ignore_keys = []
                
        with torch.no_grad():       

            loss, outputs = self.compute_loss(model, inputs, return_outputs=True)
            loss = loss.mean().detach()

            if isinstance(outputs, dict):
                logits = tuple(v for k, v in outputs.items() if k not in ignore_keys + ["loss"])
            else:
                logits = outputs
            
        if prediction_loss_only:
            return (loss, None, None)
        
        labels = nested_detach(labels)
        logits = nested_detach(logits)
        if len(logits) == 1:
            logits = logits[0]

        return (loss, logits, labels)
    
    def compute_loss(self, model, inputs, return_outputs=False):

        audios, labels = inputs['input_values'], inputs['labels']
        # forward pass
        outputs = model(audios)
        outputs = outputs['logits']
        # compute custom loss for 3 labels with different weights
        labels = labels.type(torch.LongTensor).to('cuda')
        loss_fn = nn.CrossEntropyLoss()
        loss = loss_fn(outputs, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
from transformers import AutoModelForAudioClassification, TrainingArguments

num_labels = len(id2label)
ksmodel = AutoModelForAudioClassification.from_pretrained(
    "facebook/wav2vec2-base", num_labels=num_labels, label2id=label2id, id2label=id2label
)

train_loader = DataLoader(train, shuffle=True, batch_size=8)
inputs = next(iter(train_loader))
audio, label =inputs['input_values'], inputs['label']
out = ksmodel(audio)
out = out['logits']

label = label.type(torch.LongTensor)
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(out, label)
loss


In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-5,
    num_train_epochs=5,
    push_to_hub=False
)

trainer = CustomTrainer(
    model=ksmodel,
    args=training_args,
    train_dataset=train,
    eval_dataset=val,
    compute_metrics=compute_metrics)

trainer.train()

In [None]:
from torchmetrics.classification import MulticlassAccuracy

test_loader = DataLoader(test, batch_size=8)
accuracy_metric = MulticlassAccuracy(num_classes=3).to('cuda')

test_accuracy = []

ksmodel.eval()

for batch in test_loader:
    audio, label =batch['input_values'].to('cuda'), batch['label'].to('cuda')

    out = ksmodel(audio)
    out = out['logits']

    pred = out.argmax(dim=-1)

    test_acc = accuracy_metric(pred, label)
    test_accuracy.append(test_acc)
    
print(f"test_accuracy : {torch.tensor(test_accuracy).mean()}")

In [None]:
num_labels = len(id2label)
ksmodel2 = AutoModelForAudioClassification.from_pretrained(
    "facebook/wav2vec2-base", num_labels=num_labels, label2id=label2id, id2label=id2label
)


hidden_dim = 256

ksmodel2.classifier = nn.Sequential(
                          nn.Linear(hidden_dim, hidden_dim),
                          nn.ReLU(),
                          nn.Linear(hidden_dim, hidden_dim),
                          nn.ReLU(),
                          nn.Linear(hidden_dim, num_labels))

ksmodel2

In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-5,
    num_train_epochs=5,
    push_to_hub=False
)

trainer = CustomTrainer(
    model=ksmodel2,
    args=training_args,
    train_dataset=train,
    eval_dataset=val,
    compute_metrics=compute_metrics)

trainer.train()



In [None]:
pretrain_model = AutoModelForAudioClassification.from_pretrained("/mnt/code/asr_wav2vec_tutorial/results/checkpoint-2780")
pretrain_model