In [None]:
!pip install --upgrade datasets transformers accelerate soundfile librosa evaluate jiwer tensorboard gradio

In [1]:
import jsonlines
import torchaudio
from datasets import Dataset, load_metric, DatasetDict
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor, Trainer, TrainingArguments
from pathlib import Path
import torch
import librosa
import IPython.display as ipd
import jiwer

In [2]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
device

'cuda:0'

In [3]:
# Define the path to the directory
current_directory = Path.cwd()
file_path = current_directory / '..' / '..' / 'novice'
data_dir = file_path.resolve()
print(data_dir, current_directory)

# Read data from a jsonl file and reformat it
data = {'key': [], 'audio': [], 'transcript': []}
with jsonlines.open(data_dir / "asr.jsonl") as reader:
    for obj in reader:
        # if len(data['key']) < 500: 
        for key, value in obj.items():
            data[key].append(value)


# Convert to a Hugging Face dataset
dataset = Dataset.from_dict(data) # converts it into a dataset object which has in-built helper functions to help us later on when we need to do operations on it
# think of it as a special pandas library :)

# Shuffle the dataset
dataset = dataset.shuffle(seed=42) # shuffle the dataset (one of the in-built helper functions of the Hugging Face dataset)

# Split the dataset into training, validation, and test sets
# train_size = int(0.8 * len(dataset))
# val_size = int(0.1 * len(dataset))
# test_size = len(dataset) - train_size - val_size
train_size = 560
val_size = 70
test_size = 70

train_dataset = dataset.select(range(train_size))
val_dataset = dataset.select(range(train_size, train_size + val_size))
test_dataset = dataset.select(range(train_size + val_size, train_size + val_size + test_size))

dataset = DatasetDict({
    'train': train_dataset,
    'test': test_dataset,
    'val': val_dataset})

dataset

/home/jupyter/novice /home/jupyter/til-24-base/asr


DatasetDict({
    train: Dataset({
        features: ['key', 'audio', 'transcript'],
        num_rows: 560
    })
    test: Dataset({
        features: ['key', 'audio', 'transcript'],
        num_rows: 70
    })
    val: Dataset({
        features: ['key', 'audio', 'transcript'],
        num_rows: 70
    })
})

In [4]:
dataset['train'][0],dataset['train'][1]

({'key': 518,
  'audio': 'audio_518.wav',
  'transcript': 'Heading is two one five, target is brown helicopter, tool to deploy is EMP.'},
 {'key': 1511,
  'audio': 'audio_1511.wav',
  'transcript': 'Heading is two four zero, target is silver fighter plane, tool to deploy is interceptor jets.'})

In [5]:
def replace_audio(batch):
    audio_path = batch['audio']
    speech_array, sampling_rate = torchaudio.load(data_dir / "audio" / audio_path)
    batch['audio'] = DatasetDict({
        'array': speech_array,
        'path': audio_path,
        'sampling_rate': sampling_rate})

    return batch

dataset = dataset.map(replace_audio)

Map:   0%|          | 0/560 [00:00<?, ? examples/s]

Map:   0%|          | 0/70 [00:00<?, ? examples/s]

Map:   0%|          | 0/70 [00:00<?, ? examples/s]

In [None]:
speech_array, sampling_rate = torchaudio.load(data_dir / "audio" / 'audio_1.wav')
speech_array, sampling_rate

In [6]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-large")


from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-large", language="English", task="transcribe")

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [7]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array 
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids 
    batch["labels"] = tokenizer(batch["transcript"]).input_ids
    return batch

dataset = dataset.map(prepare_dataset, remove_columns=dataset.column_names["train"], num_proc=4)
dataset

Map (num_proc=4):   0%|          | 0/560 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/70 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/70 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['input_features', 'labels'],
        num_rows: 560
    })
    test: Dataset({
        features: ['input_features', 'labels'],
        num_rows: 70
    })
    val: Dataset({
        features: ['input_features', 'labels'],
        num_rows: 70
    })
})

In [8]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-large", language="English", task="transcribe")

import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

# put together a list of samples into a mini training batch, https://www.youtube.com/watch?v=-RPeakdlHYo
@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [9]:
import evaluate

metric = evaluate.load("wer")

def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [10]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-large")
model.to(device)

model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-large-trained-quarter",  # change to a repo name of your choice
    per_device_train_batch_size=8,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-5,
    warmup_steps=10,
    max_steps=4000,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=1000,
    eval_steps=1000,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    # use_cpu=False
)

from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

trainer.train()
trainer.save_model('./whisper-large-trained-quarter')

max_steps is given, it will override any value given in num_train_epochs
`use_cache = True` is incompatible with gradient checkpointing. Setting `use_cache = False`...


Step,Training Loss,Validation Loss


OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 

In [None]:
trainer.save_model('./whisper-small-hi')

In [None]:
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from datasets import DatasetDict
import jiwer
from jiwer import wer
from functools import reduce
from pathlib import Path
import torchaudio
import torch
# from datasets import Dataset, load_metric, DatasetDict
# from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor, Trainer, TrainingArguments
# from pathlib import Path
# import torch
# import librosa
# import IPython.display as ipd
# import jiwer

device = "cuda:0" if torch.cuda.is_available() else "cpu"

# load model and processor
processor = WhisperProcessor.from_pretrained("openai/whisper-small")
model = WhisperForConditionalGeneration.from_pretrained("./whisper-small-hi/checkpoint-1000")
model.config.forced_decoder_ids = None
# model.to(device)

# Define the path to the directory
current_directory = Path.cwd()
file_path = current_directory / '..' / '..' / 'novice'
data_dir = file_path.resolve()

In [None]:
def predict_audio(sample):
    sample_audio = sample['audio']
    actual_transcript = sample['transcript']
    
    input_features = processor(sample_audio["array"], sampling_rate=sample_audio["sampling_rate"], return_tensors="pt").input_features 
    # generate predicted token ids
    predicted_ids = model.generate(input_features)
    # decode predicted token ids to text
    predicted_transcript = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
    
    wer_score = wer(actual_transcript, predicted_transcript)
    
    sample['wer'] = wer_score
    return sample
    
prediction = dataset['val'].map(predict_audio)
print(prediction)

val_wer = reduce(lambda a, b: a+b['wer'], prediction, 0)/len(prediction)

print(f"WER%: {val_wer}")

In [None]:
from torchaudio import transforms

def predict_audio_from_file(file_path):
 
    speech_array, sampling_rate = torchaudio.load(file_path)
    
    # resample to 16000 hz (required by model)
    if sampling_rate != 16000:
        transform = transforms.Resample(sampling_rate, 16000)
        speech_array = transform(speech_array)
        
        
    sample_audio = DatasetDict({
        'array': speech_array.squeeze(0),
        'sampling_rate': 16000
    })
    
    input_features = processor(sample_audio["array"], sampling_rate=sample_audio["sampling_rate"], return_tensors="pt").input_features
    # input_features = input_features.to(device)
    
    # generate predicted token ids
    predicted_ids = model.generate(input_features)
    # decode predicted token ids to text
    prediction = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
    
    return prediction

In [None]:
import time

start = time.time()

# prediction = predict_audio_from_file(data_dir / 'audio' / 'audio_1000.wav')
prediction = predict_audio_from_file('audio_2.m4a')
transcript = "Heading is one niner five, target is yellow missile, tool to deploy is surface-to-air missiles."
print(f"Actual: {transcript}\n")
print(f"Prediction: {prediction}\n")
print(f"WER%: {100* wer(transcript, prediction)}\n")

end = time.time()
print(f"Time Taken: {end - start:.2f}s")

In [None]:
speech_array, sampling_rate = torchaudio.load('audio_2.m4a')
speech_array, sampling_rate

In [None]:
from torchaudio import functional




speech_array, sampling_rate = torchaudio.load('audio_2.m4a')


transform = functional.resample(speech_array, sampling_rate, 16000)
speech_array = transform(speech_array)

print(speech_array)