In [37]:
import os
from datasets import (Dataset, load_metric, DatasetDict)
import re
import json
from transformers import (Wav2Vec2CTCTokenizer, Wav2Vec2FeatureExtractor, Wav2Vec2Processor, Wav2Vec2ForCTC)
import random
import numpy as np
import librosa
from sklearn.model_selection import train_test_split
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from transformers import TrainingArguments
from transformers import Trainer


def read_dataframe(csv_fn, poc = False):
    label_df = pd.read_csv(csv_fn)
    if poc : 
        return label_df.iloc[:1000]
    return label_df

def add_absolute_path(csv:pd.DataFrame, root_path):
    csv['filename'] = os.path.join(root_path, 'train', 'train_data') + csv['filename']
    return csv

def train_valid_split(df, valid_ratio=0.15):
    train=df.sample(
        frac=1-valid_ratio,
        random_state=200)
    valid=df.drop(train.index)
    return train, valid
    

def from_pandas_to_datasets(
        csv : pd.DataFrame):
    return Dataset.from_pandas(csv)



def make_vocab(datasets_df, vocab_col, chars_to_ignore_regex, unknown_token = "[UNK]", pad_token = "[PAD]", save_fn = 'vocab.json'):
    def remove_special_characters(batch, target_column, chars_to_ignore_regex):
        batch[target_column] = re.sub(chars_to_ignore_regex, '', batch[target_column])
        return batch

    def extract_all_chars(batch, target_column):
        all_text = " ".join(batch[target_column])
        vocab = list(set(all_text))
        return {"vocab": [vocab], "all_text": [all_text]}
    
    datasets_df = datasets_df.map(lambda x: remove_special_characters(x, vocab_col, chars_to_ignore_regex))
    vocabs = datasets_df.map(lambda x: extract_all_chars(x, vocab_col), 
                             batched=True,
                             batch_size=-1,
                             keep_in_memory=True,
                             remove_columns=timit.column_names["train"])
    
    vocab_list = list(set(vocabs["train"]["vocab"][0]) | set(vocabs["test"]["vocab"][0]))
    vocab_dict = {v: k for k, v in enumerate(vocab_list)}

    # 스페이스는 잘 안보이니까 | 로 대치
    vocab_dict["|"] = vocab_dict[" "]
    del vocab_dict[" "]

    vocab_dict["[UNK]"] = len(vocab_dict)
    vocab_dict["[PAD]"] = len(vocab_dict)
    len(vocab_dict)

    with open(save_fn, 'w') as vocab_file:
        json.dump(save_fn, vocab_file)

    return vocab_dict



@dataclass
class DataCollatorCTCWithPadding:


    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}


random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"]'

df = read_dataframe(os.path.join(DATASET_PATH, 'train', 'train_label'))
df = add_absolute_path(df, DATASET_PATH)
train, valid = train_valid_split(df)

ds_train = Dataset.from_pandas(train)
ds_test = Dataset.from_pandas(valid)

ds_dict = {'train' : Dataset.from_pandas(ds_train),
           'test' : Dataset.from_pandas(ds_test)}

df_dataset = DatasetDict(ds_dict)

vocab_dict = make_vocab(
    datasets_df = df_dataset, 
    vocab_col = 'text',
    chars_to_ignore_regex=chars_to_ignore_regex,
)



tokenizer = Wav2Vec2CTCTokenizer(config.vocab_fn, unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")
feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=False)
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)    

def prepare_dataset(batch, processor, file_fn_col='filename', target_col = 'text'):
    audio, _ = librosa.load(batch[file_fn_col], sr=16_000)
    audio, _ = librosa.effects.trim(audio)
    
    # batched output is "un-batched" to ensure mapping is correct
    batch["input_values"] = processor(audio, sampling_rate=16_000).input_values[0]
    batch["input_length"] = len(batch["input_values"])

    with processor.as_target_processor():
        batch["labels"] = processor(batch[target_col]).input_ids
    return batch

df_dataset = df_dataset.map(lambda x: prepare_dataset(x, processor), remove_columns=df_dataset.column_names["train"], num_proc=4)

max_input_length_in_sec = 10
df_dataset['train'] = df_dataset['train'].filter(lambda x: x<max_input_length_in_sec*processor.feature_extractor.sampling_rate, input_columns = ['input_length'])


data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)
wer_metric = load_metric("wer")



model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-base",
    ctc_loss_reduction="mean",
    pad_token_id=processor.tokenizer.pad_token_id,
).to('cuda')

model.freeze_feature_encoder()



training_args = TrainingArguments(
  output_dir=f"./saved_models",
  group_by_length=True,
  per_device_train_batch_size=16,
  evaluation_strategy="steps",
  num_train_epochs=10,
  fp16=True,
  gradient_checkpointing=True,
  save_steps=500,
  eval_steps=500,
  logging_steps=500,
  learning_rate = 5e-5,
  weight_decay=0.005,
  warmup_steps=1000,
  save_total_limit=10,
  seed = 42,
  save_strategy='epoch'
)


trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=timit["train"],
    eval_dataset=timit["test"],
    tokenizer=processor.feature_extractor,
)

class CustomTrainer(Trainer):

    def __init__(self, model, data_collator, training_args, train_dataset, eval_dataset, tokenizer=processor.feature_extractor):
        super(Trainer, self).__init__(
            model = model,
            data_collator = data_collator,
            training_args = training_args,
            train_dataset = train_dataset,
            eval_dataset = eval_dataset,
            tokenizer = tokenizer,
        )


        
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        # forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")
        # compute custom loss (suppose one has 3 labels with different weights)
        loss_fct = nn.CrossEntropyLoss(weight=torch.tensor([1.0, 2.0, 3.0], device=model.device))
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss


NameError: name 'DATASET_PATH' is not defined

In [11]:
import re


def remove_special_characters(batch, target_column):
    batch[target_column] = re.sub(chars_to_ignore_regex, '', batch[target_column])
    return batch

timit = timit.map(
    lambda batch : 
        remove_special_characters(batch, 'text' )
)


def extract_all_chars(batch, target_columns):
    all_text = " ".join(batch[target_columns])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}

vocabs = timit.map(
    lambda batch : 
        extract_all_chars(batch, 'text'), 
        batched=True, 
        batch_size=-1, 
        keep_in_memory=True, 
        remove_columns=timit.column_names["train"]
        )



4620ex [00:00, 16548.36ex/s]
1680ex [00:00, 16855.19ex/s]
100%|██████████| 1/1 [00:00<00:00, 37.97ba/s]
100%|██████████| 1/1 [00:00<00:00, 102.13ba/s]


In [27]:
vocab_list = list(
    set(vocabs["train"]["vocab"][0]) | set(vocabs["test"]["vocab"][0]))

In [29]:
def extract_all_chars(batch):
  all_text = " ".join(batch["text"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

vocabs = timit.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=timit.column_names["train"])

100%|██████████| 1/1 [00:00<00:00, 35.74ba/s]
100%|██████████| 1/1 [00:00<00:00, 101.28ba/s]


In [31]:
timit['train']

Dataset({
    features: ['file', 'audio', 'text'],
    num_rows: 4620
})

In [28]:
vocab_dict = {v: k for k, v in enumerate(vocab_list)}
vocab_dict

{'t': 0,
 'P': 1,
 'Q': 2,
 'k': 3,
 'l': 4,
 'N': 5,
 'O': 6,
 'A': 7,
 'd': 8,
 'M': 9,
 'x': 10,
 'j': 11,
 'B': 12,
 'e': 13,
 ' ': 14,
 'w': 15,
 'H': 16,
 'G': 17,
 'u': 18,
 'L': 19,
 'p': 20,
 'n': 21,
 'm': 22,
 'v': 23,
 'b': 24,
 'C': 25,
 'q': 26,
 'z': 27,
 'y': 28,
 'W': 29,
 'i': 30,
 'g': 31,
 'V': 32,
 'h': 33,
 'F': 34,
 'f': 35,
 'o': 36,
 'S': 37,
 'I': 38,
 'T': 39,
 'a': 40,
 'D': 41,
 'K': 42,
 'Y': 43,
 'r': 44,
 's': 45,
 'E': 46,
 'J': 47,
 'X': 48,
 'R': 49,
 'U': 50,
 "'": 51,
 'c': 52}

In [20]:
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

55