### You can retrieve dataset from 
- https://drive.google.com/drive/folders/1AIMtt0iXPepJsnXMarQE7gWWter2yYU7?usp=sharing
### checkpoints
- https://drive.google.com/drive/folders/1DgCfEn8VkmT2tQwbTxSI_ETx9PWRAaP1?usp=sharing

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Tue Nov 21 02:47:04 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.105.17   Driver Version: 525.105.17   CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   69C    P8    10W /  70W |      0MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [3]:
# !pip install datasets>=2.6.1
# !pip install git+https://github.com/huggingface/transformers
# !pip install librosa
# !pip install evaluate>=0.30
# !pip install jiwer
# !pip install gradio
# !pip install typing-extensions --upgrade
# !pip install deepcut
! pip install -U accelerate
! pip install -U transformers



In [36]:
import re
import pathlib
import os
from typing import List

import pandas as pd
from deepcut import tokenize
from datasets import Dataset, DatasetDict
from datasets.features import Audio
import numpy as np
import jiwer

In [5]:
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [6]:
data_dir = pathlib.Path("/content/drive/MyDrive/speech-to-text/dataset")
train_dir = data_dir / "train"
test_dir = data_dir / "test"

# label
train_label_dir = train_dir / "train_label.csv"
test_label_dir = test_dir / "test_label.csv"

## Prepare Model

### Load WhisperFeatureExtractor

In [7]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained("biodatlab/whisper-th-medium-combined")

### Load WhisperTokenizer

In [8]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("biodatlab/whisper-th-medium-combined", language="th", task="transcribe")

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


###Combine To Create A WhisperProcessor


In [9]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("biodatlab/whisper-th-medium-combined", language="Thai", task="transcribe")

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


## Prepare dataset

### Train dataset

In [11]:
tain_label = pd.read_csv(train_label_dir).rename(columns={'text': 'sentence'})
test_label = pd.read_csv(test_label_dir).rename(columns={'Actual-transcript': 'sentence'})

In [12]:
train_label = tain_label[['filename', 'sentence']]
train_label['audio'] = str(train_dir) + "/" + train_label['filename']
test_label = test_label[['filename', 'sentence']]
test_label['audio'] = str(test_dir) + "/" + test_label['filename']

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_label['audio'] = str(train_dir) + "/" + train_label['filename']


In [13]:
train_label.loc[0]['audio']

'/content/drive/MyDrive/speech-to-text/dataset/train/Oppday Q32023 NOBLE บมจ โนเบิล ดีเวลลอปเมนท์/Oppday Q32023 NOBLE บมจ โนเบิล ดีเวลลอปเมนท์_chunk00000.wav'

In [14]:
tds = Dataset.from_pandas(train_label)
vds = Dataset.from_pandas(test_label)

In [15]:
ds = DatasetDict()
ds['train'] = tds.cast_column("audio", Audio(sampling_rate=16000))
ds['test'] = vds.cast_column("audio", Audio(sampling_rate=16000))

### Test dataset

In [16]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["sentence"]).input_ids
    return batch

In [17]:
ds = ds.map(prepare_dataset, remove_columns=ds.column_names["train"], num_proc=4)

Map (num_proc=4):   0%|          | 0/771 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/323 [00:00<?, ? examples/s]

In [18]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [19]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [37]:
CLEAN_PATTERNS = "((นะ)?(คะ|ครับ)|เอ่อ|อ่า)"
REMOVE_TOKENS = {"", " "}

def hack_wer(
    hypothesis: str,
    reference: str,
    debug=False,
  ) -> float:
  """
  we will tokenize TH long txt into list of words,
  then concat it back separated by whitespace.
  Then, we will just use normal WER jiwer, to utilize
  C++ implementation.
  """
  refs = tokenize(re.sub(CLEAN_PATTERNS, "", reference))
  hyps = tokenize(re.sub(CLEAN_PATTERNS, "", hypothesis))

  refs = [r for r in refs if r not in REMOVE_TOKENS]
  hyps = [h for h in hyps if h not in REMOVE_TOKENS]

  if debug: print(refs); print(hyps)

  return jiwer.wer(" ".join(refs), " ".join(hyps))


def isd_np(preds: list[str], actuals: list[str], debug=True) -> int:
  dp = np.array([np.arange(len(preds) + 1) for _ in range(len(actuals) + 1)], dtype="int16")

  for row in range(len(dp)):
    for col in range(len(dp[0])):
      if row == 0 or col == 0:
        dp[row][col] = max(row, col)
        continue

      if preds[col - 1] != actuals[row - 1]:
        dp[row][col] = min(dp[row - 1][col], dp[row][col - 1], dp[row - 1][col - 1]) + 1
      else:
        dp[row][col] = min(dp[row - 1][col], dp[row][col - 1], dp[row - 1][col - 1])

  if debug: print(*dp, sep="\n")

  return dp[-1][-1]


def wer(pred: str, actual: str, **kwargs) -> float:
  refs = tokenize(re.sub(CLEAN_PATTERNS, "", actual))
  hyps = tokenize(re.sub(CLEAN_PATTERNS, "", pred))

  actuals = [r for r in refs if r not in REMOVE_TOKENS]
  preds = [h for h in hyps if h not in REMOVE_TOKENS]
  if kwargs["debug"]: print(f"{preds}\n{actuals}")
  err = isd_np(preds, actuals, **kwargs)
  return err / len(actuals)
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    # pred_str, and label_str is list[str]
    pred_strs = processor.tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_strs = processor.tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wers = list(map(hack_wer, pred_strs, label_strs))
    wer = sum(map(lambda w: w * 100, wers)) / len(wers)

    return {"wer": wer}

##Load a Pre-Trained Checkpoint

In [21]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("biodatlab/whisper-th-medium-combined")

In [22]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

In [31]:
CHUNK_LENGTH = 30
NUM_BEAMS = 2
BATCH_SIZE = 16
N = 2
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

In [32]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="/content/drive/MyDrive/speech-to-text/checkpoints/ver0-11-21-23/fine-tune-thonburian",  # change to a repo name of your choice
    per_device_train_batch_size=BATCH_SIZE // N,
    gradient_accumulation_steps=N,  # increase by 2x for every 2x decrease in batch size
    learning_rate=2e-5,
    warmup_steps=0, # 1000
    max_steps=1, # 6000
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=1, # 1000
    eval_steps=1, # 1000
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=False,
)

In [33]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=ds["train"],
    eval_dataset=ds["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [34]:
processor.save_pretrained(training_args.output_dir)

In [38]:
trainer.train()



Step,Training Loss,Validation Loss,Wer
1,No log,1.408574,38.915994


Step,Training Loss,Validation Loss




There were missing keys in the checkpoint model loaded: ['proj_out.weight'].


TrainOutput(global_step=1, training_loss=3.3940653800964355, metrics={'train_runtime': 571.8224, 'train_samples_per_second': 0.028, 'train_steps_per_second': 0.002, 'total_flos': 3.265935704064e+16, 'train_loss': 3.3940653800964355, 'epoch': 0.02})