# Dependencies configuration

In [None]:
!df -h

In [None]:
%env LC_ALL=C.UTF-8
%env LANG=C.UTF-8
%env TRANSFORMERS_CACHE=/content/cache
%env HF_DATASETS_CACHE=/content/cache
%env CUDA_LAUNCH_BLOCKING=1

In [None]:
%%capture
!pip install git+https://github.com/huggingface/datasets.git
!pip install git+https://github.com/huggingface/transformers.git
!pip install torchaudio
!pip install librosa
!pip install jiwer
!pip install wandb

In [None]:
#Needed to store training metrics in the WandB environment
%env WANDB_PROJECT={project-name}
!wandb login {Token} --relogin

In [None]:
#import dependencies for using in the code
from datasets import load_dataset, load_metric

import pandas as pd
import numpy as np

from tqdm import tqdm

import os
import string
import six
import re

In [None]:
abs_path_to_metadata = "../input/metadata/" #path to metadata.json
abs_path_to_data = "../input/utterance-level" #path to the audio files
abs_output_path = "./" #output folder path

# Pre-processing and Tokenizer vocab creation

In [None]:
import json

file = open(f"{abs_path_to_metadata}/metadata_split_by_song.json",'r')
json_data = file.read()
data = json.loads(json_data)

train_df = pd.DataFrame()
test_df = pd.DataFrame()
for value in data.values():
    res = {}
    res["path"] = f"{abs_path_to_data}/{value['path']}/audio.wav"
    res["sentence"] = value["lyrics"]
    if value["split"]=="train":
        train_df = train_df.append(res,ignore_index=True)
    else:
        test_df = test_df.append(res,ignore_index=True)

train_df.to_csv(f"{abs_output_path}/train.tsv",sep="\t")
test_df.to_csv(f"{abs_output_path}/test.tsv",sep="\t")

In [None]:
#Custom normalizer for procesing of annotations

def normalizer(text):
    text = text.replace("\\n","\n")
    text = ' '.join(text.split())
    text = re.sub(r'''(/|-|_)'''," ", text)
    text = text.strip()
    return text

In [None]:
train_df = pd.read_csv(f"{abs_output_path}/train.tsv", sep="\t")
_train_df = train_df.copy()
total_records = len(train_df)
train_df["id"] = range(0, total_records)
print(f"Step 0: {len(train_df)}")

# train_df["path"] = abs_path_to_data + "/clips/" + train_df["path"]
train_df["status"] = train_df["path"].apply(lambda path: True if os.path.exists(path) else None)
train_df = train_df.dropna(subset=["path"])
train_df = train_df.drop("status", 1)
print(f"Step 1: {len(train_df)}")

train_df["sentence"] = train_df["sentence"].apply(lambda t: normalizer(t))
train_df = train_df.dropna(subset=["sentence"])
print(f"Step 2: {len(train_df)}")

term_a = set(list(range(0, total_records)))
term_b = set(train_df["id"].values.tolist())
removed_items_train = [_train_df.iloc[index]["path"] for index in list(term_a - term_b)]
train_df = train_df.reset_index(drop=True)
train_df.head()

In [None]:
print(f"Items to be removed {len(removed_items_train)}") # any data in the training dataset which can not be reached should be removed

In [None]:
test_df = pd.read_csv(f"{abs_output_path}/test.tsv", sep="\t")

_test_df = test_df.copy()
total_records = len(test_df)
test_df["id"] = range(0, total_records)
print(f"Step 0: {len(test_df)}")

test_df["status"] = test_df["path"].apply(lambda path: True if os.path.exists(path) else None)
test_df = test_df.dropna(subset=["path"])
test_df = test_df.drop("status", 1)
print(f"Step 1: {len(test_df)}")

test_df["sentence"] = test_df["sentence"].apply(lambda t: normalizer(t))
test_df = test_df.dropna(subset=["sentence"])
print(f"Step 2: {len(test_df)}")

term_a = set(list(range(0, total_records)))
term_b = set(test_df["id"].values.tolist())
removed_items_test = [_test_df.iloc[index]["path"] for index in list(term_a - term_b)]
test_df = test_df.reset_index(drop=True)
test_df.head()

In [None]:
print(f"Items to be removed {len(removed_items_test)}") # any data in the test dataset which can not be reached should be removed

In [None]:
removed_items = removed_items_train + removed_items_test

for path in removed_items:
    if os.path.exists(path):
        os.remove(path)

In [None]:
text = " ".join(train_df["sentence"].values.tolist() + test_df["sentence"].values.tolist())
vocab = list(sorted(set(text)))

print(len(vocab), vocab)

In [None]:
train_df = train_df.drop('Unnamed: 0',axis=1)
test_df = test_df.drop('Unnamed: 0',axis=1)

train_df.to_csv("./train.csv", sep="\t", encoding="utf-8", index=False)
test_df.to_csv("./test.csv", sep="\t", encoding="utf-8", index=False)

print(train_df.shape)
print(test_df.shape)

In [None]:
common_voice_train = load_dataset("csv", data_files={"train": "./train.csv"}, delimiter="\t")["train"]
common_voice_test = load_dataset("csv", data_files={"test": "./test.csv"}, delimiter="\t")["test"]

print(common_voice_train)
print(common_voice_test)

In [None]:
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:
show_random_elements(common_voice_train.remove_columns(["path"]), num_examples=20)

In [None]:
import re
chars_to_ignore_regex = '[\,\?\.\!\-\_\;\:\"\“\%\‘\”\।\’\']'

def remove_special_characters(batch):
    text = re.sub(chars_to_ignore_regex, '', batch["sentence"]).lower() + " "
    text = normalizer(text)
    batch["text"] = text
    return batch

In [None]:
common_voice_train = common_voice_train.map(remove_special_characters, remove_columns=["sentence"])
common_voice_test = common_voice_test.map(remove_special_characters, remove_columns=["sentence"])

In [None]:
show_random_elements(common_voice_train.remove_columns(["path"]))

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["text"])
    vocab = list(set(all_text))
    vocab.append('q')
    return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
vocab_train = common_voice_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_train.column_names)
vocab_test = common_voice_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_test.column_names)

In [None]:
vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0]))

In [None]:
vocab_dict = {v: k for k, v in enumerate(vocab_list)}
vocab_dict

In [None]:
print(len(vocab_dict))
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

In [None]:
import json
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

## Loading an audio sample

In [None]:
import torchaudio
import librosa
import IPython.display as ipd
import numpy as np

sample = train_df.iloc[np.random.randint(0, len(train_df))]

path = sample["path"]
print(sample["sentence"], "\n")
speech = torchaudio.load(path)
speech = speech[0].numpy().squeeze() # Wav2Vec2 expects a 1-D tensor array. Using torchaudio we convert it into that format.

speech = librosa.resample(np.asarray(speech), orig_sr = 48_000, target_sr = 16_000) #All audio samples in Wav2Vec2 are of 16 KHz signal rate. So we resample it
ipd.Audio(data=np.asarray(speech), autoplay=True, rate=16000)

# Training Procedure

In [None]:
!mkdir -p ./dataset

In [None]:
trainset = []

for item in tqdm(common_voice_train, position=0, total=len(common_voice_train)):
    features = common_voice_train.features
    data = {}
    for key in features:
        data[key] = item[key]
    
    trainset.append(data)

trainset = pd.DataFrame(trainset)
trainset.to_csv("./dataset/train.csv", sep="\t")


testset = []

for item in tqdm(common_voice_test, position=0, total=len(common_voice_test)):
    features = common_voice_test.features
    data = {}
    for key in features:
        data[key] = item[key]
    
    testset.append(data)

testset = pd.DataFrame(testset)
testset.to_csv("./dataset/test.csv", sep="\t")

In [None]:
trainset.head()

In [None]:
testset.head()

In [None]:
#Defining directory to save model training results
save_dir = "./wav2vec2-base"
!ls {save_dir}

In [None]:
import os
from transformers.trainer_utils import get_last_checkpoint

last_checkpoint = None

if os.path.exists(save_dir):
    last_checkpoint = get_last_checkpoint(save_dir)
    
print(last_checkpoint if last_checkpoint else 0)

In [None]:
#Loading tokenizer from Wav2Vec2 saved directory if it exists
from transformers import Wav2Vec2CTCTokenizer

if not os.path.exists(save_dir):
    print("NotExist")
    tokenizer = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")
else:
    tokenizer = Wav2Vec2CTCTokenizer.from_pretrained(save_dir)

In [None]:
#Loading feature extractor from Wav2Vec2 saved directory if it exists
from transformers import Wav2Vec2FeatureExtractor

if not os.path.exists(save_dir):
    print("NotExist")
    feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)
else:
    feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(save_dir)

In [None]:
#Creating a processor with the feature extractor and tokenizer defined above
from transformers import Wav2Vec2Processor

if not os.path.exists(save_dir):
    print("NotExist")
    processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)
else:
    processor = Wav2Vec2Processor.from_pretrained(save_dir)

In [None]:
if not os.path.exists(save_dir):
    print("NotExist")
    processor.save_pretrained(save_dir)
    print("Saved!")

In [None]:
common_voice_train[0]

### Function to convert speech to a numpy array

In [None]:
import torchaudio
import librosa
import numpy as np


def speech_file_to_array_fn(file_path):
    speech_array, _ = torchaudio.load(file_path)

    speech_array = speech_array[0].numpy()
    speech_array = librosa.resample(np.asarray(speech_array), orig_sr = 48_000, target_sr =16_000)
    sampling_rate = 16_000

    return speech_array, sampling_rate

### Testing an audio sample after it is converted into a suitable format

In [None]:
import IPython.display as ipd
import numpy as np
import random

rand_int = random.randint(0, len(trainset))
sample = trainset.iloc[rand_int]

text = sample["text"]
path = sample["path"]

speech_array, sampling_rate = speech_file_to_array_fn(path)

print("Target text:", text)
print("Input array shape:", np.asarray(speech_array).shape)
print("Sampling rate:", sampling_rate)
print()

ipd.Audio(data=np.asarray(speech_array), autoplay=True, rate=16000)

In [None]:
import torchaudio
import librosa

import numpy as np
import pandas as pd

from torch.utils.data import Dataset, DataLoader
import os


class CommonVoiceDataset(Dataset):

    def __init__(self, csv_file, root_dir, processor, column_names=None, sep="\t"):
        self.data = pd.read_csv(os.path.join(root_dir, csv_file), sep=sep)
        self.processor = processor
        self.column_names = column_names

    def __len__(self):
        return len(self.data)


    def speech_file_to_array_fn(self, batch):
        speech_array, sampling_rate = torchaudio.load(batch["path"])
        batch["speech"] = speech_array[0].numpy()
        batch["sampling_rate"] = sampling_rate
        batch["target_text"] = batch["text"]
        return batch

    
    def resample(self, batch):
        batch["speech"] = librosa.resample(np.asarray(batch["speech"]), orig_sr = 48_000, target_sr = 16_000)
        batch["sampling_rate"] = 16_000
        return batch

    
    def prepare_dataset(self, batch, column_names=None):
        batch["input_values"] = self.processor(batch["speech"], sampling_rate=batch["sampling_rate"]).input_values[0].tolist()

        with self.processor.as_target_processor():
            batch["labels"] = self.processor(batch["target_text"]).input_ids

        if column_names and isinstance(column_names, list):
            batch = {name: batch[name] for name in column_names}
        
        return batch


    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        batch = self.data.iloc[idx].copy()
        batch = batch.to_dict()
        batch = self.speech_file_to_array_fn(batch)
        batch = self.resample(batch)
        batch = self.prepare_dataset(batch, self.column_names)

        return batch 

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
train_dataset = CommonVoiceDataset("train.csv", "./dataset/", processor=processor, column_names=["input_values", "labels"])
test_dataset = CommonVoiceDataset("test.csv", "./dataset/", processor=processor, column_names=["input_values", "labels"])

In [None]:
print(len(train_dataset))
print(len(test_dataset))

In [None]:
for batch in train_dataset:
    print(batch.keys())
    print(type(batch))
    # print(batch)
    break

In [None]:
wer_metric = load_metric("wer")

In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

### Defining the model parameters

In [None]:
from transformers import Wav2Vec2ForCTC

model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-base" if not last_checkpoint else last_checkpoint, 
    attention_dropout=0.1,
    hidden_dropout=0.1,
    feat_proj_dropout=0.1,
    mask_time_prob=0.05,
    layerdrop=0.1,
    gradient_checkpointing=True, 
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=processor.tokenizer.vocab_size
)
model.config.ctc_zero_infinity = True

In [None]:
print(len(processor.tokenizer))
print(processor.tokenizer.vocab_size)

In [None]:
model.freeze_feature_extractor()

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir=save_dir,
    group_by_length=True,
    per_device_train_batch_size=16,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=30,
    fp16=False,
    save_steps=100, 
    eval_steps=100, 
    logging_steps=100,
    learning_rate=5e-5,
    weight_decay=0.1,
    warmup_steps=500,
    save_total_limit=2,
    dataloader_num_workers=4
)

In [None]:
import collections

from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

from torch.utils.data.sampler import RandomSampler, SequentialSampler
from transformers import Trainer
from transformers.trainer import (
    SequentialDistributedSampler, 
    SequentialSampler,
    DistributedSamplerWithLoop
)
from transformers.trainer import is_datasets_available


class CommonVoiceTrainer(Trainer):

    def _get_train_sampler(self):
        if isinstance(self.train_dataset, torch.utils.data.IterableDataset) or not isinstance(
            self.train_dataset, collections.abc.Sized
        ):
            return None 
        
        if self.args.world_size <= 1:
            return RandomSampler(self.train_dataset)
        elif self.args.parallel_mode == ParallelMode.TPU and not self.args.dataloader_drop_last:
            # Use a loop for TPUs when drop_last is False to have all batches have the same size.
            return DistributedSamplerWithLoop(
                self.train_dataset,
                batch_size=self.args.per_device_train_batch_size,
                num_replicas=self.args.world_size,
                rank=self.args.process_index,
            )
        else:
            return DistributedSampler(
                self.train_dataset, num_replicas=self.args.world_size, rank=self.args.process_index
            )
    
    def get_train_dataloader(self):
        if self.train_dataset is None:
            raise ValueError("Trainer: training requires a train_dataset.")
        train_sampler = self._get_train_sampler()

        return DataLoader(
            self.train_dataset,
            batch_size=self.args.train_batch_size,
            sampler=train_sampler,
            collate_fn=self.data_collator,
            drop_last=self.args.dataloader_drop_last,
            num_workers=self.args.dataloader_num_workers,
            pin_memory=self.args.dataloader_pin_memory,
        )
    
    def _get_eval_sampler(self, eval_dataset):
        if self.args.local_rank != -1:
            return SequentialDistributedSampler(eval_dataset)
        else:
            return SequentialSampler(eval_dataset)


    def get_eval_dataloader(self, eval_dataset: Optional[Dataset] = None):
        eval_dataset = eval_dataset if eval_dataset is not None else self.eval_dataset
        eval_sampler = self._get_eval_sampler(eval_dataset)

        return DataLoader(
            eval_dataset,
            sampler=eval_sampler,
            batch_size=self.args.eval_batch_size,
            collate_fn=self.data_collator,
            drop_last=self.args.dataloader_drop_last,
            num_workers=self.args.dataloader_num_workers,
            pin_memory=self.args.dataloader_pin_memory,
        )

In [None]:
trainer = CommonVoiceTrainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=processor.feature_extractor
)

In [None]:
#If a checkpoint is stored from before, use that for continuing training, else start from 0
if last_checkpoint:
    print(f"last_checkpoint: {last_checkpoint}")
    train_result = trainer.train(resume_from_checkpoint=last_checkpoint)
else:
    train_result = trainer.train()

# Saving the Trained Model

In [None]:
metrics = train_result.metrics
max_train_samples = len(train_dataset)
metrics["train_samples"] = min(max_train_samples, len(train_dataset))

trainer.save_model()

trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)
trainer.save_state()

### Saving the trained model and processor to local directory

In [None]:
trainer.save_model('./')

### Saving the trained model to HuggingFace repository

In [None]:
from huggingface_hub import notebook_login

notebook_login() #login to HuggingFace to save the model

In [None]:
#Install git-lfs to help push the model to a repository in HuggingFace
%%capture
!apt install git-lfs

!git config --global credential.helper store

In [None]:
model.push_to_hub("songs_transcription_wav2vec_base2",use_auth_token="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")
processor.push_to_hub("songs_transcription_wav2vec_base2",use_auth_token="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")

# Inference on the Validation dataset using the saved model on HuggingFace

In [None]:
test_dataset = CommonVoiceDataset("test.csv", "./dataset/", processor=processor, column_names=None)
print(test_dataset[10].keys())

In [None]:
input_values = []
labels = []

test_loader = DataLoader(test_dataset, batch_size=10, collate_fn=data_collator)
for data in tqdm(test_loader, total=len(test_loader)):
    data_input_values = data["input_values"]
    data_labels = data["labels"]

    input_values.extend([data_input_values[i] for i in range(data_input_values.shape[0])])
    labels.extend([data_labels[i] for i in range(data_labels.shape[0])])

    # break

itest_loader = {"input_values": input_values, "labels": labels}

In [None]:
assert len(itest_loader["input_values"]) == len(itest_loader["labels"])

In [None]:
assert len(itest_loader["input_values"]) == len(test_dataset)

In [None]:
idx = np.random.randint(0, len(test_dataset))
print(f"idx {idx}")

print(f"TEXT: {test_dataset[idx]['text']}")
print(f"INPUT: {(itest_loader)['input_values'][0][:5]}")

In [None]:
input_dict = processor(itest_loader["input_values"][idx], return_tensors="pt", padding=True)

logits = model(input_dict.input_values.to("cuda")).logits

pred_ids = torch.argmax(logits, dim=-1)[0]

In [None]:
sample = test_dataset[idx]

print("Prediction:")
print(processor.decode(pred_ids))

print("\nReference:")
print(sample["text"].lower())


speech = torchaudio.load(sample["path"])
speech = speech[0].numpy().squeeze()

speech = librosa.resample(np.asarray(speech), 48_000, 16_000)
ipd.Audio(data=np.asarray(speech), autoplay=True, rate=16000)

In [None]:
import os
from transformers.trainer_utils import get_last_checkpoint

last_checkpoint = None

if os.path.exists(save_dir):
    last_checkpoint = get_last_checkpoint(save_dir)
    
print(last_checkpoint if last_checkpoint else 0)

In [None]:
processor_hf = Wav2Vec2Processor.from_pretrained("akanksha-b14/songs_transcription_wav2vec_base2",use_auth_token="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")
model_hf = Wav2Vec2ForCTC.from_pretrained("akanksha-b14/songs_transcription_wav2vec_base2",use_auth_token="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")
model_hf.to("cuda")

In [None]:
import torch
import torchaudio
from datasets import Dataset, load_metric
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
import re
from sklearn.model_selection import train_test_split
import pandas as pd

df_test = pd.read_csv("./test.tsv", sep="\t")
test_dataset = Dataset.from_pandas(df_test)
wer = load_metric("wer")

In [None]:
chars_to_ignore_regex = '[\,\?\.\!\-\_\;\:\"\“\%\‘\”\।\’\'\&]'
resampler = torchaudio.transforms.Resample(48_000, 16_000)

def normalizer(text):
    # Use your custom normalizer
    text = text.replace("\\n","\n")
    text = ' '.join(text.split())
#     text = re.sub(r'([a-z]+)','',text,flags=re.IGNORECASE)
#     text = re.sub(r'''(/|-|_)'''," ", text)
    text = text.strip()
    return text

def speech_file_to_array_fn(batch):
#     batch["sentence"] = normalizer(batch["sentence"])
    batch["sentence"] = re.sub(chars_to_ignore_regex, '', batch["sentence"]).lower()+ " "
    speech_array, sampling_rate = torchaudio.load(batch["path"])
    batch["speech"] = resampler(speech_array).squeeze().numpy()
    return batch

test_dataset_new = test_dataset.map(speech_file_to_array_fn)

# Preprocessing the datasets.
# We need to read the aduio files as arrays
def evaluate(batch):
    inputs = processor_hf(batch["speech"], sampling_rate=16_000, return_tensors="pt", padding=True)
    with torch.no_grad():
        logits = model_hf(inputs.input_values.to("cuda"), attention_mask=inputs.attention_mask.to("cuda")).logits
    pred_ids = torch.argmax(logits, dim=-1)
    batch["pred_strings"] = processor.batch_decode(pred_ids)
    return batch

result = test_dataset.map(evaluate, batched=True, batch_size=8)
print("WER: {:2f}".format(100 * wer.compute(predictions=result["pred_strings"], references=result["sentence"])))