In [None]:
!pip install datasets transformers accelerate soundfile librosa evaluate jiwer tensorboard gradio



In [None]:
import os
import glob
import numpy as np
import torch
import evaluate
import librosa

from datasets import load_dataset, DatasetDict, Audio
from transformers import WhisperFeatureExtractor, WhisperTokenizer
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer
from dataclasses import dataclass
from typing import Any, Dict, List, Union

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import tqdm
def read_txt(main_file, folder_path):
    file_content_dict = {}

    # Open the main file and read each line (each line is a file name)
    with open(main_file, 'r', encoding='utf-8') as file_list:
        for file_name in tqdm.tqdm(file_list):
            file_name = file_name.strip()  # Remove newline characters and any trailing spaces
            full_path = os.path.join(folder_path, file_name)  # Combine the folder path and file name

            # Open the text file and read its content
            try:
                with open(full_path, 'r', encoding='utf-8') as file:
                    # print(file)
                    file_content = file.read()
                    file_content_dict[file_name] = file_content
            except FileNotFoundError:
                print(f"File {full_path} not found.")

    return file_content_dict


train_file = '/content/gdrive/MyDrive/train_files.txt'
test_file = '/content/gdrive/MyDrive/test_files.txt'
txt_segments_folder = '/content/gdrive/MyDrive/text'

train_dict_txt = read_txt(train_file, txt_segments_folder)
test_dict_txt = read_txt(test_file, txt_segments_folder)

1910it [00:01, 1088.17it/s]
480it [00:00, 1122.11it/s]


In [None]:
train_dict_txt

{'segment_263890_271130.txt': 'jméno jeho Ezau Potom pak vyšel bratr jeho a rukou',
 'segment_68020_75380.txt': 'Refaimské v Astarotu Karnaimských a Zuzimské v Cham a Eminské',
 'segment_367670_375380.txt': 'mluviti ku Pánu svému Snad se nalezne tam dvadceti Odpověděl Nezahladím',
 'segment_133660_140790.txt': 'jeho Ti tři révové jsou tři dnové Po třech dnech povýší Farao',
 'segment_332470_339670.txt': 'prvé než kraloval král nad syny Izraelskými Kraloval tedy v Edom Béla',
 'segment_548360_555900.txt': 'tvého na tebe a zapomenul by na to což jsi mu učinil Potom já pošli',
 'segment_248930_257820.txt': 'Synové Lotanovi byli Hori a Hemam a sestra Lotanova Tamna Synové',
 'segment_164150_172030.txt': 'umíš jej vyložiti Odpověděl Jozef Faraonovi řka Není to má věc Bůh',
 'segment_179610_186610.txt': 'Přišel pak jeden kterýž byl utekl a zvěstoval Abramovi',
 'segment_52890_61370.txt': 'čeledi své v národech svých Synové pak Chamovi Chus',
 'segment_110590_117590.txt': 'Hlas krve bratra tv

In [None]:
def read_wav(folder_path):
    file_content_dict = {}

    # Iterate over all WAV files in the specified directory
    for file_path in glob.glob(os.path.join(folder_path, '*.wav')):
        file_name = os.path.basename(file_path)

        # Use librosa to load the WAV file
        # This will return the audio data as a NumPy array and its sample rate
        audio_data, _ = librosa.load(file_path, sr=16000) # sr=None loads the file at its original sample rate

        file_content_dict[file_name] = audio_data

    return file_content_dict

wav_segments_folder = "/content/gdrive/MyDrive/asr/wav_segments"  # Replace with the path to the folder containing the text files
wav_segments_folder = "/content/gdrive/MyDrive/uniform_output"


dict_wav = read_wav(wav_segments_folder)

In [None]:
def combine_dictionaries(text_dict, audio_dict, sampling_rate=16000):
    combined_list = []

    for text_key in text_dict:
        base_name = os.path.splitext(text_key)[0]  # Remove the file extension from the text file name

        # Construct the expected audio file name
        audio_key = base_name + '.wav'
        if audio_key in audio_dict:
            entry = {
                'audio': audio_dict[audio_key],
                'sentence': text_dict[text_key]
            }
            combined_list.append(entry)

    return combined_list


train_set = combine_dictionaries(train_dict_txt, dict_wav)
test_set = combine_dictionaries(test_dict_txt, dict_wav)

In [None]:
train_set[0]

{'audio': array([ 0.02685547,  0.03381348,  0.03799438, ..., -0.00036621,
        -0.00036621, -0.00085449], dtype=float32),
 'sentence': 'jméno jeho Ezau Potom pak vyšel bratr jeho a rukou'}

In [None]:
model_name = "openai/whisper-small"

In [None]:
processor = WhisperProcessor.from_pretrained(model_name, language="czech", task="transcribe")

In [None]:
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name)
tokenizer = WhisperTokenizer.from_pretrained(model_name, language="czech", task="transcribe")

In [None]:
def prepare_dataset(data):
    for example in data:
        features = feature_extractor(example["audio"], sampling_rate=16000).input_features[0]
        labels = tokenizer(example["sentence"]).input_ids
        example["input_features"] = features
        example["labels"] = labels
        del example["audio"]
        del example["sentence"]

prepare_dataset(train_set)
prepare_dataset(test_set)

In [None]:
train_set[0]

{'input_features': array([[ 0.5140126 ,  0.0890159 ,  0.1645338 , ..., -0.65262187,
         -0.65262187, -0.65262187],
        [ 0.52374864, -0.05902326,  0.04201525, ..., -0.65262187,
         -0.65262187, -0.65262187],
        [ 0.5416091 ,  0.35897875,  0.21340752, ..., -0.65262187,
         -0.65262187, -0.65262187],
        ...,
        [-0.5880704 , -0.65262187, -0.65262187, ..., -0.65262187,
         -0.65262187, -0.65262187],
        [-0.593704  , -0.65262187, -0.65262187, ..., -0.65262187,
         -0.65262187, -0.65262187],
        [-0.5965189 , -0.65262187, -0.65262187, ..., -0.65262187,
         -0.65262187, -0.65262187]], dtype=float32),
 'labels': [50258,
  50283,
  50359,
  50363,
  35195,
  526,
  1771,
  1506,
  1289,
  27211,
  1459,
  9145,
  298,
  20843,
  44766,
  7891,
  338,
  47869,
  81,
  1506,
  1289,
  257,
  367,
  2034,
  263,
  50257]}

In [None]:
@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, data):
        # print(data)
        input_features = [{"input_features": example["input_features"]} for example in data]
        # convert to tensors
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")#.to(device)

        label_features = [{"input_ids": example["labels"]} for example in data]
        # pad label ids to the max length in the batch
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")#.to(device)

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [None]:
metric = evaluate.load("wer")
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
model = WhisperForConditionalGeneration.from_pretrained(model_name).to(device)

config.json:   0%|          | 0.00/1.97k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/967M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/3.84k [00:00<?, ?B/s]

In [None]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

In [None]:
training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-base-hi",  # change to a repo name of your choice
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-5,
    warmup_steps=500,
    max_steps=2000,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=500,
    eval_steps=500,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=False,
)

In [None]:
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=train_set,
    eval_dataset=test_set,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()



Step,Training Loss,Validation Loss,Wer
500,0.1151,0.389549,34.943559
1000,0.0102,0.41664,34.063745
1500,0.0014,0.445524,29.963479
2000,0.0011,0.455895,29.897078


There were missing keys in the checkpoint model loaded: ['proj_out.weight'].


TrainOutput(global_step=2000, training_loss=0.18493119423370807, metrics={'train_runtime': 9994.2372, 'train_samples_per_second': 3.202, 'train_steps_per_second': 0.2, 'total_flos': 9.1885591461888e+18, 'train_loss': 0.18493119423370807, 'epoch': 16.67})