In [None]:
# pip
# !apt install git-lfs

!pip install datasets==1.18.3
!pip install transformers==4.17.0
!pip install jiwer
!pip install numpy==1.23.4

In [None]:
# import
import os
import torch

### GPU

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Enable CUDA error logging
os.environ['CUDA_LAUNCH_BLOCKING'] = '0'

if torch.cuda.is_available():
    os.environ["CUDA_VISIBLE_DEVICES"] = '0,1,2,3,4,5,6,7'

### Load Dataset

In [None]:
from datasets import load_from_disk

# import local dataset
train_dataset = load_from_disk("./data/datasets/train")
eval_dataset = load_from_disk("./data/datasets/eval")

In [None]:
import random
import pandas as pd
from IPython.display import display, HTML

# 무작위로 샘플 보여주는 함수 정의
def show_random_elements(dataset, num_examples=7):
    assert num_examples <= len(dataset)
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:
# 한 번 확인하기
# print("===========train=================")
print(len(train_dataset))
# show_random_elements(train_dataset)
# print("===========eval=================")
# show_random_elements(eval_dataset)
print(len(eval_dataset))

In [None]:
# 특수문자 제거
import re
chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"]'

def remove_special_characters(batch):
    batch["transcription"] = re.sub(chars_to_ignore_regex, '', batch["transcription"]).lower() + " "
    return batch

In [None]:
# 특수문자 제거된 것 적용
train_dataset = train_dataset.map(remove_special_characters)
eval_dataset = eval_dataset.map(remove_special_characters)

### Tokenizer 만들기

In [None]:
import psutil

def extract_all_chars(batch):
    # 메모리 관련 로직 처리
    memory = psutil.virtual_memory()
    threshold = 80

    if memory.percent >= threshold:
        print("메모리 위험 :",memory.percent)
        return None
        
    all_text = " ".join(batch["transcription"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
vocab_train = train_dataset.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=train_dataset.column_names)
vocab_eval = eval_dataset.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=eval_dataset.column_names)

In [None]:
vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_eval["vocab"][0]))
vocab_dict = {v: k for k, v in enumerate(vocab_list)}

In [None]:
vocab_dict

In [None]:
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

In [None]:
# vocab json 저장
import json

model_path = "./model/wav2vec_second"
vocab_json = model_path+"/vocab.json"


with open(vocab_json, 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

In [None]:
# tokenizer 생성
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer(vocab_json, unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

In [None]:
tokenizer.save_pretrained(model_path)

### model에 대한 특성 정의

In [None]:
from transformers import Wav2Vec2FeatureExtractor

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=False)

In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [None]:
### 전처리 데이터 확인

In [None]:
train_dataset[0]["path"]

In [None]:
eval_dataset[0]["path"]

In [None]:
import IPython.display as ipd
import numpy as np
import random

rand_int = random.randint(0, len(train_dataset))

print(train_dataset[rand_int]["transcription"])
ipd.Audio(data=np.asarray(train_dataset[rand_int]["audio"]["array"]), autoplay=False, rate=16000)

print("Target text:", train_dataset[rand_int]["transcription"])
print("Input array shape:", np.asarray(train_dataset[rand_int]["audio"]["array"]).shape)
print("Sampling rate:", train_dataset[rand_int]["audio"]["sampling_rate"])

In [None]:
rand_int = random.randint(0, len(eval_dataset))

print(eval_dataset[rand_int]["transcription"])
ipd.Audio(data=np.asarray(eval_dataset[rand_int]["audio"]["array"]), autoplay=False, rate=16000)

print("Target text:", eval_dataset[rand_int]["transcription"])
print("Input array shape:", np.asarray(eval_dataset[rand_int]["audio"]["array"]).shape)
print("Sampling rate:", eval_dataset[rand_int]["audio"]["sampling_rate"])

In [None]:
def prepare_dataset(batch):
    audio = batch["audio"]

    batch["input_values"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_values[0]
    batch["input_length"] = len(batch["input_values"])

    with processor.as_target_processor():
        batch["labels"] = processor(batch["transcription"]).input_ids
    return batch

In [None]:
train_dataset = train_dataset.map(prepare_dataset, remove_columns=train_dataset.column_names)
print(train_dataset)

In [None]:
eval_dataset = eval_dataset.map(prepare_dataset, remove_columns=eval_dataset.column_names)
print(eval_dataset)

In [None]:
# max_input_length_in_sec =.0
# train_dataset = train_dataset.filter(lambda x: x > max_input_length_in_sec * processor.feature_extractor.sampling_rate, input_columns=["input_length"])

### trainer

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )
        
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
from datasets import load_metric

wer_metric = load_metric("wer")

In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
from transformers import Wav2Vec2ForCTC

model = Wav2Vec2ForCTC.from_pretrained(
    "./model/wav2vec",
    ctc_loss_reduction="mean",
    pad_token_id=processor.tokenizer.pad_token_id,
).to(device)

In [None]:
model.freeze_feature_encoder()

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
  # output_dir=repo_name,
  output_dir=model_path,
  group_by_length=True,
  per_device_train_batch_size=16,
  evaluation_strategy="steps",
  num_train_epochs=10,
  fp16=True,
  gradient_checkpointing=True,
  save_steps=300,
  eval_steps=300,
  logging_steps=300,
  learning_rate=1e-4,
  weight_decay=0.005,
  warmup_steps=1000,
  save_total_limit=2,
)

In [None]:
print(len(train_dataset))
print(len(eval_dataset))

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()

In [None]:
trainer.save_model(model_path)