## connect with Google Drive

In [None]:

from google.colab import drive
drive.mount('/content/drive')

## install dependincies

In [None]:

! pip install transformers

In [None]:

! pip install jiwer
! pip install datasets

## Install Library


In [None]:
import os,librosa,re
import pandas as pd
import numpy as np
import datasets
import torch
from datasets import Dataset, DatasetDict,load_from_disk
from transformers import AutoProcessor
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
import pandas as pd
import datasets
from datasets import Dataset, DatasetDict
from datasets import load_dataset, Audio
from transformers import AutoProcessor
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML
import json
from transformers import Wav2Vec2CTCTokenizer
from transformers import TrainingArguments
from transformers import Trainer
from transformers import AutoModelForCTC, Wav2Vec2Processor
from transformers import Wav2Vec2ForCTC
from transformers import Wav2Vec2FeatureExtractor
from transformers import Wav2Vec2Processor
import IPython.display as ipd
import numpy as np
import random
import torch
from datasets import load_dataset, load_metric
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

## Load huggface dataset from drive

In [None]:

dataset = load_from_disk('/content/drive/My Drive/call_recording_data_set_small')


## Huggface dataset structure

In [None]:

dataset

## Split a hugg face dataset into train and test training size=85 % and testing size=15 %

In [None]:

minds2 = dataset.train_test_split(test_size=0.15)
timit=minds2


## Display sample dataset text

In [None]:

def show_random_elements(dataset, num_examples=100):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:

show_random_elements(timit["train"].remove_columns(["audio", "file"]),)

## Apply text preprocessing on Hugg face dataset

In [None]:

chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"]'

def remove_special_characters(batch):
    batch["text"] = re.sub(chars_to_ignore_regex, '', batch["text"]).lower() + " "
    batch['text']=batch['text'].replace("/"," ")
    batch['text']=batch['text'].replace("("," ")
    batch['text']=batch['text'].replace(")"," ")
    batch['text']=batch['text'].replace("$","dollar")
    batch['text']=batch['text'].replace("_"," ")
    return batch

In [None]:
timit = timit.map(remove_special_characters)

## Create vocab from training and testing dataset

In [None]:

def extract_all_chars(batch):
  all_text = " ".join(batch["text"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
vocabs = timit.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=timit.column_names["train"])

# Create vocab list

In [None]:

vocab_list = list(set(vocabs["train"]["vocab"][0]) | set(vocabs["test"]["vocab"][0]))

## Create vocab dictionary

In [None]:

vocab_dict = {v: k for k, v in enumerate(vocab_list)}
(vocab_dict)

## Add padding for vocab dict


In [None]:
vocab_dict["|"] = vocab_dict[" "]
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

## Save vocab into a json file


In [None]:
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

## Tokeinizer the vocab


In [None]:
tokenizer = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

## Create feature extraction variable from wav2vec2 and pass arguments

In [None]:
feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)

## Create processor where we add tokenizer and feature extractor


In [None]:
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)
processor

## Prepare the dataset according to the huggface model input type

In [None]:

def prepare_dataset(batch):
    audio = batch["audio"]

    # batched output is "un-batched" to ensure mapping is correct
    batch["input_values"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_values[0]
    batch["input_length"] = len(batch["input_values"])
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["text"]).input_ids
    return batch

In [None]:
timit = timit.map(prepare_dataset, remove_columns=timit.column_names["train"], )

## Create Data collcetor to store the huggface data for training and testing during fine-tuning

In [None]:



@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

## Load word error rate matrix


In [None]:
wer_metric = load_metric("wer")

## Code for calculate word erro rate  during fine tuning

In [None]:

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

## Load pretrained model and provide ctc loss reductuion technique for fine tuning


In [None]:
model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-base-960h",
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
   
)

## Freeze the model because it helps to understand finetuning data properly and load model to GPU


In [None]:
model.freeze_feature_encoder()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device
model.to(device)

## Provide training arguments


In [None]:
training_args = TrainingArguments(
  output_dir="/content/drive/MyDrive/"+"test_response2",
  group_by_length=True,
  per_device_train_batch_size=8,
  evaluation_strategy="steps",
  num_train_epochs=8,
  
  gradient_checkpointing=True,
  save_steps=50,
  eval_steps=10,
  logging_steps=4,
  learning_rate=1e-4,
  weight_decay=0.005,
  warmup_steps=1000,
  save_total_limit=2,
  push_to_hub=False,
      

)

## Pass trainer argument with trainer library for training


In [None]:
trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=timit["train"],
    eval_dataset=timit["test"],
    tokenizer=processor.feature_extractor,
)

## Start sample training with 10-15 sec dataset and check word error rate


In [None]:
trainer.train()

## Save the fintune model and processor for future test

In [None]:
model.save_pretrained("/content/drive/MyDrive/test_response2/")
processor.save_pretrained("/content/drive/My Drive/test_response/")

## Load the save model

In [None]:
"from transformers import AutoModelForCTC, Wav2Vec2Processor

model = AutoModelForCTC.from_pretrained("/content/drive/MyDrive/test_response2/checkpoint-1050")
processor = Wav2Vec2Processor.from_pretrained("/content/drive/MyDrive/test_response2/checkpoint-1050")

## Create code to check the Word Error rate on test data and show results

In [None]:
def map_to_result(batch):
  with torch.no_grad():
    input_values = torch.tensor(batch["input_values"], device="cuda").unsqueeze(0)
    logits = model(input_values).logits

    pred_ids = torch.argmax(logits, dim=-1)
    batch["pred_str"] = processor.batch_decode(pred_ids)
    batch["text"] = processor.decode(batch["labels"], group_tokens=False)
    
    return batch

In [None]:
results = timit["test"].map(map_to_result, remove_columns=timit["test"].column_names)

In [None]:
print("Test WER: {:.3f}".format(wer_metric.compute(predictions=results["pred_str"], references=results["text"])))

In [None]:
show_random_elements(results)

# New section

# New section