In [None]:
import os
import IPython
import matplotlib
import matplotlib.pyplot as plt
import requests
import torch
import torchaudio
import ssl
import pythainlp
from pythainlp import tokenize
ssl._create_default_https_context = ssl._create_unverified_context
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from pythainlp.tokenize import word_tokenize

In [None]:
from datasets import load_dataset
data=load_dataset("librispeech_asr", "clean", "train.100")
print(data)

In [None]:
speech = data.remove_columns(['speaker_id', 'chapter_id', 'id'])
speech

In [None]:
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

show_random_elements(speech["train.100"].remove_columns([ "audio","file"]))


In [None]:
import re
chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"]'

def remove_special_characters(batch):
    batch["text"] = re.sub(chars_to_ignore_regex, '', batch["text"]).lower()
    return batch

speech = speech.map(remove_special_characters)


In [None]:
show_random_elements(speech["train.100"].remove_columns(["file", "audio"]))


In [None]:
def extract_all_chars(batch):
  all_text = " ".join(batch["text"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

vocabs = speech.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=speech.column_names["train.100"])


In [None]:
vocab_list = list(set(vocabs["train.100"]["vocab"][0]) | set(vocabs["test"]["vocab"][0]))

vocab_dict = {v: k for k, v in enumerate(vocab_list)}
vocab_dict


In [None]:
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
print(len(vocab_dict))


In [None]:
import json
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)


In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")


In [None]:
from transformers import Wav2Vec2FeatureExtractor

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=False)


In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)


In [None]:
speech['train.100'][0]['file']

In [None]:
speech['train.100'][0]['audio']

In [None]:
import IPython.display as ipd
import numpy as np
import random

rand_int = random.randint(0, len(speech["train.100"]))

print(speech["train.100"][rand_int]["text"])
ipd.Audio(data=np.asarray(speech["train.100"][rand_int]["audio"]["array"]), autoplay=True, rate=16000)


In [None]:
rand_int = random.randint(0, len(speech["train.100"]))

print("Target text:", speech["train.100"][rand_int]["text"])
print("Input array shape:", np.asarray(speech["train.100"][rand_int]["audio"]["array"]).shape)
print("Sampling rate:", speech["train.100"][rand_int]["audio"]["sampling_rate"])


In [None]:
def prepare_dataset(batch):
    audio = batch["audio"]

    # batched output is "un-batched" to ensure mapping is correct
    batch["input_values"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_values[0]
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["text"]).input_ids
    return batch


In [None]:
speech

In [None]:
speech = speech.map(prepare_dataset, remove_columns=speech.column_names["train.100","train.360"], num_proc=8)

In [None]:
max_input_length_in_sec = 4.0
timit["train"] = timit["train"].filter(lambda x: x < max_input_length_in_sec * processor.feature_extractor.sampling_rate, input_columns=["input_length"])

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)


In [None]:
from datasets import load_metric
wer_metric = load_metric("wer")

In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}


In [None]:
from transformers import Wav2Vec2ForCTC
model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h",ctc_loss_reduction="mean", pad_token_id=processor.tokenizer.pad_token_id,)

In [None]:
model.freeze_feature_extractor()


In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
  output_dir='LibriSpeech/train-clean-100/103/1240/Checkpoints2',
  group_by_length=True,
  per_device_train_batch_size=32,
  evaluation_strategy="steps",
  num_train_epochs=30,
  fp16=False,
  gradient_checkpointing=True, 
  save_steps=500,
  eval_steps=500,
  logging_steps=500,
  learning_rate=1e-4,
  weight_decay=0.005,
  warmup_steps=1000,
  save_total_limit=2)


In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=speech["train.100"],
    eval_dataset=speech["validation"],
    tokenizer=processor.feature_extractor,
)


In [None]:
trainer.train()
