# Hugging face Blog

## Preparing Dataset

In [1]:
from datasets import Dataset, DatasetDict
import os

# Define paths to your data folders
audio_folder = "../Dataset/audio"
transcripts_folder = "../Dataset/text"
summaries_folder = "../Dataset/summary"

# Create a list of audio, transcript, and summary file paths
audio_files = sorted([os.path.join(audio_folder, filename) for filename in os.listdir(audio_folder)])
transcript_files = sorted([os.path.join(transcripts_folder, filename) for filename in os.listdir(transcripts_folder)])
summary_files = sorted([os.path.join(summaries_folder, filename) for filename in os.listdir(summaries_folder)])

# Check if the number of audio, transcript, and summary files match
if len(audio_files) != len(transcript_files) != len(summary_files):
    raise ValueError("Number of files in each folder must match")

# Create a dictionary containing your dataset
dataset_dict = {
    "audio": audio_files,
    "transcript": transcript_files,
    "summary": summary_files,
}

# Create a Hugging Face Dataset
custom_dataset = Dataset.from_dict(dataset_dict)

# Split the dataset into training, validation, and test sets
train_percentage = 0.8
validation_percentage = 0.1
test_percentage = 0.1

train_dataset = custom_dataset.select(range(int(len(custom_dataset) * train_percentage)))

validation_dataset = custom_dataset.select(range(int(len(custom_dataset) * train_percentage), 
                                                 int(len(custom_dataset) * (train_percentage + validation_percentage))))

test_dataset = custom_dataset.select(range(int(len(custom_dataset) * (train_percentage + validation_percentage)), 
                                           len(custom_dataset)))

# Create a DatasetDict
custom_dataset_dict = DatasetDict({
    "train": train_dataset,
    "validation": validation_dataset,
    "test": test_dataset,
})

# Print the dataset
print(custom_dataset_dict)



  from .autonotebook import tqdm as notebook_tqdm


DatasetDict({
    train: Dataset({
        features: ['audio', 'transcript', 'summary'],
        num_rows: 400
    })
    validation: Dataset({
        features: ['audio', 'transcript', 'summary'],
        num_rows: 50
    })
    test: Dataset({
        features: ['audio', 'transcript', 'summary'],
        num_rows: 50
    })
})


## Feature extraction

In [2]:
whisper="openai/whisper-small.en"

In [3]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained(whisper)


In [4]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained(whisper, 
                                             language="english")

In [5]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained(whisper, language="English", task="transcribe")

In [6]:
print(custom_dataset_dict["train"][0])

{'audio': '../Dataset/audio\\000000.mp3', 'transcript': '../Dataset/text\\000000.txt', 'summary': '../Dataset/summary\\000000.txt'}


In [7]:
from datasets import Audio

custom_dataset_dict = custom_dataset_dict.cast_column("audio", Audio(sampling_rate=16000))

In [8]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array 
    batch["input_features"] = feature_extractor(audio["array"], 
                                sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids 
    batch["labels"] = tokenizer(batch["transcript"]).input_ids

    return batch

In [9]:
from functools import partial

# Define the prepare_dataset function with the feature_extractor argument
def prepare_dataset(batch, feature_extractor,tokenizer):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array 
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids 
    batch["labels"] = tokenizer(batch["transcript"]).input_ids
    return batch

# Use partial to create a function with feature_extractor as a fixed argument
prepare_with_feature_extractor = partial(prepare_dataset, feature_extractor=feature_extractor,tokenizer=tokenizer)

# Pass the prepared function when using .map()
custom_dataset_dict = custom_dataset_dict.map(
    prepare_with_feature_extractor,
    remove_columns=custom_dataset_dict.column_names["train"],
    num_proc=14
)


Map (num_proc=14): 100%|██████████| 400/400 [06:39<00:00,  1.00 examples/s]
Map (num_proc=14): 100%|██████████| 50/50 [00:52<00:00,  1.06s/ examples]
Map (num_proc=14): 100%|██████████| 50/50 [01:07<00:00,  1.36s/ examples]


In [10]:
import numpy as np
print(np.array(custom_dataset_dict["train"]['input_features']).shape)

(400, 80, 3000)


## Train the model

### Data Colloctor

In [11]:
import torch
from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [12]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

### Evaluation metric

In [13]:
import evaluate

metric = evaluate.load("wer")
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}



### Load Pre-trained Checkpoint

In [23]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained(whisper)

In [24]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

In [25]:

# Check if a GPU is available
if torch.cuda.is_available():
    # If available, set the device to GPU
    device = torch.device("cuda")
    print(device)
    model.to(device)
else:
    device = torch.device("cpu")

# The rest of your code remains the same


cuda


In [27]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-small-en-sumerizer",  
    per_device_train_batch_size=8,
    gradient_accumulation_steps=2,  
    learning_rate=1e-5,
    #warmup_steps=500,
    max_steps=1000,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=5,
    predict_with_generate=True,
    generation_max_length=20,
    save_steps=250,
    eval_steps=125,
    logging_steps=2,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=False,
)

In [28]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=custom_dataset_dict["train"],
    eval_dataset=custom_dataset_dict["validation"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [29]:
processor.save_pretrained(training_args.output_dir)

In [30]:
trainer.train()


 88%|████████▊ | 882/1000 [23:33:23<3:18:24, 100.88s/it]

{'loss': 0.0001, 'learning_rate': 1.21e-06, 'epoch': 35.28}


 88%|████████▊ | 884/1000 [23:36:36<3:11:14, 98.92s/it] 

{'loss': 0.0001, 'learning_rate': 1.19e-06, 'epoch': 35.36}


 89%|████████▊ | 886/1000 [23:39:51<3:06:37, 98.22s/it]

{'loss': 0.0001, 'learning_rate': 1.1700000000000002e-06, 'epoch': 35.44}


 89%|████████▉ | 888/1000 [23:42:58<2:58:52, 95.83s/it]

{'loss': 0.0001, 'learning_rate': 1.1500000000000002e-06, 'epoch': 35.52}


 89%|████████▉ | 890/1000 [23:46:05<2:53:27, 94.61s/it]

{'loss': 0.0001, 'learning_rate': 1.1300000000000002e-06, 'epoch': 35.6}


 89%|████████▉ | 892/1000 [23:49:12<2:49:15, 94.03s/it]

{'loss': 0.0001, 'learning_rate': 1.1100000000000002e-06, 'epoch': 35.68}


 89%|████████▉ | 894/1000 [23:52:25<2:48:50, 95.57s/it]

{'loss': 0.0001, 'learning_rate': 1.0900000000000002e-06, 'epoch': 35.76}


 90%|████████▉ | 896/1000 [23:55:45<2:49:38, 97.87s/it]

{'loss': 0.0001, 'learning_rate': 1.0700000000000001e-06, 'epoch': 35.84}


 90%|████████▉ | 898/1000 [23:59:05<2:48:21, 99.03s/it]

{'loss': 0.0001, 'learning_rate': 1.0500000000000001e-06, 'epoch': 35.92}


 90%|█████████ | 900/1000 [24:02:24<2:45:20, 99.21s/it]

{'loss': 0.0001, 'learning_rate': 1.03e-06, 'epoch': 36.0}


 90%|█████████ | 901/1000 [24:04:05<2:44:48, 99.88s/it]

KeyboardInterrupt: 

In [34]:
custom_dataset_dict["test"]["labels"]

[[50258,
  50259,
  50363,
  353,
  14,
  35,
  37892,
  302,
  14,
  25111,
  59,
  1360,
  48957,
  13,
  83,
  734,
  50257],
 [50258,
  50259,
  50363,
  353,
  14,
  35,
  37892,
  302,
  14,
  25111,
  59,
  1360,
  8465,
  16,
  13,
  83,
  734,
  50257],
 [50258,
  50259,
  50363,
  353,
  14,
  35,
  37892,
  302,
  14,
  25111,
  59,
  1360,
  8465,
  17,
  13,
  83,
  734,
  50257],
 [50258,
  50259,
  50363,
  353,
  14,
  35,
  37892,
  302,
  14,
  25111,
  59,
  1360,
  8465,
  18,
  13,
  83,
  734,
  50257],
 [50258,
  50259,
  50363,
  353,
  14,
  35,
  37892,
  302,
  14,
  25111,
  59,
  1360,
  8465,
  19,
  13,
  83,
  734,
  50257],
 [50258,
  50259,
  50363,
  353,
  14,
  35,
  37892,
  302,
  14,
  25111,
  59,
  1360,
  8465,
  20,
  13,
  83,
  734,
  50257],
 [50258,
  50259,
  50363,
  353,
  14,
  35,
  37892,
  302,
  14,
  25111,
  59,
  1360,
  8465,
  21,
  13,
  83,
  734,
  50257],
 [50258,
  50259,
  50363,
  353,
  14,
  35,
  37892,
  302,
  14,

In [36]:
from datasets import  load_metric
# Generate predictions on the test dataset
predictions = trainer.predict(custom_dataset_dict["test"])

# Calculate the WER for each prediction compared to the ground truth
metric = load_metric("wer")  # Load the WER metric
predictions = predictions.predictions
references = custom_dataset_dict["test"]["labels"]  # Replace with the correct column name

wer = metric.compute(predictions=predictions, references=references)

# Calculate the loss on the test dataset
loss_fn = torch.nn.CrossEntropyLoss()
logits = torch.tensor(predictions)  # Convert predictions to a PyTorch tensor
references = torch.tensor(references)  # Convert references to a PyTorch tensor
loss = loss_fn(logits.view(-1, model.config.vocab_size), references.view(-1))

# Print the results
print(f"Word Error Rate (WER): {wer}")
print(f"Loss: {loss.item()}")

# Plot the results (you can use your preferred plotting library)
import matplotlib.pyplot as plt

# Plot WER
plt.figure(figsize=(8, 4))
plt.plot(wer, label="WER")
plt.xlabel("Example Index")
plt.ylabel("WER")
plt.title("Word Error Rate (WER) on Test Dataset")
plt.legend()
plt.show()



ValueError: expected sequence of length 17 at dim 1 (got 18)

In [None]:
import torch
from datasets import load_dataset, load_metric
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments

# Load the test dataset
test_dataset = load_dataset("your_dataset_name", "test")

# Initialize a data collator for evaluation
data_collator = DataCollatorForSeq2Seq(
    tokenizer=processor.feature_extractor,
    model=model,
)

# Create a trainer for evaluation
training_args = Seq2SeqTrainingArguments(
    output_dir="./results",  # Change to your desired output directory
    per_device_eval_batch_size=4,
    remove_unused_columns=False,
)
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
)

# Generate predictions on the test dataset
predictions = trainer.predict(test_dataset)

# Calculate the WER for each prediction compared to the ground truth
metric = load_metric("wer")  # Load the WER metric
predictions = predictions.predictions
references = test_dataset["your_target_column_name"]  # Replace with the correct column name

wer = metric.compute(predictions=predictions, references=references)

# Calculate the loss on the test dataset
loss_fn = torch.nn.CrossEntropyLoss()
logits = torch.tensor(predictions)  # Convert predictions to a PyTorch tensor
references = torch.tensor(references)  # Convert references to a PyTorch tensor
loss = loss_fn(logits.view(-1, model.config.vocab_size), references.view(-1))

# Print the results
print(f"Word Error Rate (WER): {wer}")
print(f"Loss: {loss.item()}")

# Plot the results (you can use your preferred plotting library)
import matplotlib.pyplot as plt

# Plot WER
plt.figure(figsize=(8, 4))
plt.plot(wer, label="WER")
plt.xlabel("Example Index")
plt.ylabel("WER")
plt.title("Word Error Rate (WER) on Test Dataset")
plt.legend()
plt.show()
