In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import shutil

In [3]:
try:
  !mkdir /content/data
except:
  print('already exists')
shutil.unpack_archive("/content/drive/MyDrive/datasets/svarah.tar", "/content/data")

In [4]:
import os

# Define the directory containing your text files
text_files_directory = '/content/data/svarah/audio/'

# Define the output file paths for scp_entries and txt_entries
scp_entries_file = 'scp_entries.txt'
txt_entries_file = 'txt_entries.txt'

# Initialize lists to store entries
scp_entries = []
txt_entries = []

# Set the limit for the number of instances you want to add
limit_instances = 3000

# Iterate through text files in the directory
for text_file_name in os.listdir(text_files_directory):
    if text_file_name.endswith('.txt'):
        text_file_path = os.path.join(text_files_directory, text_file_name)
        with open(text_file_path, 'r') as text_content:
            for line in text_content:
                parts = line.strip().split('\t')
                if len(parts) == 3:
                    audio_path, audio_id, transcription = parts
                    audio_path = os.path.join(audio_path)  # You may need to adjust this based on your folder structure

                    # Create an entry for scp_entries
                    scp_entry = f"{audio_id} /content/data/svarah/{audio_path}\n"
                    scp_entries.append(scp_entry)

                    # Create an entry for txt_entries
                    txt_entry = f"{audio_id} {transcription}\n"
                    txt_entries.append(txt_entry)

                if len(scp_entries) >= limit_instances:
                    # If the limit is reached, break out of the loop
                    break

    if len(scp_entries) >= limit_instances:
        # If the limit is reached, break out of the loop
        break

# Write the scp_entries and txt_entries to their respective files
with open(scp_entries_file, 'w') as scp_file:
    scp_file.writelines(scp_entries)

with open(txt_entries_file, 'w') as txt_file:
    txt_file.writelines(txt_entries)

print(f'SCP entries have been written to {scp_entries_file}')
print(f'Text entries have been written to {txt_entries_file}')


SCP entries have been written to scp_entries.txt
Text entries have been written to txt_entries.txt


In [None]:
!pip install datasets transformers accelerate evaluate

In [1]:
from datasets import Dataset, Audio, Value

In [2]:
scp_entries = open('/content/scp_entries.txt', 'r').readlines()
txt_entries = open("/content/txt_entries.txt", 'r').readlines()

if len(scp_entries) == len(txt_entries):
    audio_dataset = Dataset.from_dict({"audio": [audio_path.split()[1].strip() for audio_path in scp_entries],
                    "sentence": [' '.join(text_line.split()[1:]).strip() for text_line in txt_entries]})

    audio_dataset = audio_dataset.cast_column("audio", Audio(sampling_rate=16000))
    audio_dataset = audio_dataset.cast_column("sentence", Value("string"))
    audio_dataset.save_to_disk('/content/data/svarah/audio')
    print(audio_dataset)
    print('Data preparation done')

else:
    print('Please re-check the audio_paths and text files. They seem to have a mismatch in terms of the number of entries. Both these files should be carrying the same number of lines.')

Casting the dataset:   0%|          | 0/3000 [00:00<?, ? examples/s]

Saving the dataset (0/2 shards):   0%|          | 0/3000 [00:00<?, ? examples/s]

Dataset({
    features: ['audio', 'sentence'],
    num_rows: 3000
})
Data preparation done


In [4]:
import torch
import evaluate
from dataclasses import dataclass
from typing import Any, Dict, List, Union
from datasets import DatasetDict, Audio, load_from_disk, concatenate_datasets
from transformers.models.whisper.english_normalizer import BasicTextNormalizer
from transformers import WhisperFeatureExtractor, WhisperTokenizer, WhisperProcessor, WhisperForConditionalGeneration, Seq2SeqTrainingArguments, Seq2SeqTrainer

In [5]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-small")

In [6]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small", language="English", task="transcribe")

In [7]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="English", task="transcribe")

In [8]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["sentence"]).input_ids
    return batch

In [9]:
audio_dataset = audio_dataset.map(prepare_dataset)

Map:   0%|          | 0/3000 [00:00<?, ? examples/s]

In [10]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [11]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [None]:
!pip install jiwer

In [14]:
import evaluate

metric = evaluate.load("wer")

In [15]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [16]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")

Downloading (…)lve/main/config.json:   0%|          | 0.00/1.97k [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/967M [00:00<?, ?B/s]

Downloading (…)neration_config.json:   0%|          | 0.00/3.84k [00:00<?, ?B/s]

In [17]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

In [18]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="/content/whisper-small-hi",  # change to a repo name of your choice
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-5,
    warmup_steps=500,
    max_steps=4000,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=1000,
    eval_steps=1000,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=True,
)

In [22]:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [23]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=audio_dataset,
    eval_dataset=audio_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()

`use_cache = True` is incompatible with gradient checkpointing. Setting `use_cache = False`...


Step,Training Loss,Validation Loss,Wer
1000,0.0168,0.010501,7.411641


