In [None]:
# reference - dataset
# @inproceedings{aishell_2017,
# title={AIShell-1: An Open-Source Mandarin Speech Corpus and A Speech Recognition Baseline},
# author={Hui Bu, Jiayu Du, Xingyu Na, Bengu Wu, Hao Zheng},
# booktitle={Oriental COCOSDA 2017},
# pages={Submitted},
# year={2017}
# }

# fine tuning methods

# @Misc{peft,
#   title =        {PEFT: State-of-the-art Parameter-Efficient Fine-Tuning methods},
#   author =       {Sourab Mangrulkar and Sylvain Gugger and Lysandre Debut and Younes Belkada and Sayak Paul and Benjamin Bossan},
#   howpublished = {\url{https://github.com/huggingface/peft}},
#   year =         {2022}
# }

In [None]:
! pip install git+https://github.com/openai/whisper.git
! pip install jiwer
! pip install -U bitsandbytes

Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-iev13p7b
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-iev13p7b
  Resolved https://github.com/openai/whisper.git to commit 517a43ecd132a2089d85f4ebc044728a71d49f6e
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone


In [None]:
!pip install --upgrade pip
!pip install --upgrade datasets[audio] transformers accelerate evaluate jiwer tensorboard gradio



In [None]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
dataset_name = "AISHELL/AISHELL-1"

model_name_or_path = "openai/whisper-small"

language = "Chinese"
language_abbr = "zh"
task = "transcribe"

# final model
model_label = "drive/MyDrive/fyp/AISHELL-1/model"
model_save = "/content/drive/MyDrive/fyp/AISHELL-1/model/final_small"

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import pandas as pd
from sklearn.model_selection import train_test_split

import torchaudio
import IPython.display as ipd

import whisper
import torchaudio
import torch


In [None]:
from datasets import load_dataset

dataset = load_dataset("AISHELL/AISHELL-1")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Resolving data files:   0%|          | 0/102 [00:00<?, ?it/s]

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
ls drive/MyDrive/fyp/AISHELL-1/data_aishell_transcript_aishell_transcript_v0.8.txt

drive/MyDrive/fyp/AISHELL-1/data_aishell_transcript_aishell_transcript_v0.8.txt


In [None]:
label_path = "drive/MyDrive/fyp/AISHELL-1/data_aishell_transcript_aishell_transcript_v0.8.txt"

## insert label

In [None]:
labels = {}

# Read the transcript file line by line
with open(label_path, 'r') as f:
    for line in f:
        parts = line.strip().split(' ')
        audio_file = parts[0]  # Audio file path (like BAC009S0002W0122.wav)
        transcript = ' '.join(parts[1:])  # Transcript text
        labels[audio_file] = transcript

# Add the labels to the dataset
def add_labels_to_sample(sample):
    # Extract audio file name using '__key__' and add '.wav' extension
    audio_file_name = sample['__key__'].split('/')[-1]

    # Get the corresponding transcript from the labels dictionary
    transcript = labels.get(audio_file_name, None)

    if transcript:
        # Remove spaces from the transcript (if desired)
        transcript_no_spaces = transcript.replace(' ', '')  # This removes all spaces

        sample['transcript'] = transcript_no_spaces
    else:
        print(f"Warning: No transcript found for {audio_file_name}")
        sample['transcript'] = None

    return sample

# Add the labels to the training dataset only
dataset['train'] = dataset['train'].map(add_labels_to_sample)

# Check the result by printing the first sample
print(dataset['train'][0])  # Print the first sample to see the transcript added


{'wav': {'path': 'train/S0002/BAC009S0002W0122.wav', 'array': array([-0.00033569, -0.00054932, -0.00048828, ..., -0.00338745,
       -0.00314331, -0.00350952]), 'sampling_rate': 16000}, '__key__': 'train/S0002/BAC009S0002W0122', '__url__': '/root/.cache/huggingface/hub/datasets--AISHELL--AISHELL-1/snapshots/bbe295d530192a4cd41644b711c9aecd087df653/data_aishell/wav/S0002.tar.gz', 'transcript': '而对楼市成交抑制作用最大的限购'}


## Check Missing Transcript


In [None]:
missing_transcripts_count = sum(1 for sample in dataset['train'] if sample['transcript'] is None)

# Print the number of samples with missing transcripts
print(f"Number of samples with missing transcripts: {missing_transcripts_count}")

# Remove samples with missing transcripts
dataset['train'] = dataset['train'].filter(lambda sample: sample['transcript'] is not None)

# Check the result by printing the first sample of the cleaned dataset
print(dataset['train'][0])

Number of samples with missing transcripts: 37
{'wav': {'path': 'train/S0002/BAC009S0002W0122.wav', 'array': array([-0.00033569, -0.00054932, -0.00048828, ..., -0.00338745,
       -0.00314331, -0.00350952]), 'sampling_rate': 16000}, '__key__': 'train/S0002/BAC009S0002W0122', '__url__': '/root/.cache/huggingface/hub/datasets--AISHELL--AISHELL-1/snapshots/bbe295d530192a4cd41644b711c9aecd087df653/data_aishell/wav/S0002.tar.gz', 'transcript': '而对楼市成交抑制作用最大的限购'}


## initialize Whisper feature

In [None]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name_or_path)

In [None]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained(model_name_or_path, language=language, task=task)

In [None]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained(model_name_or_path, language=language, task=task)

## Split Data

In [None]:
from datasets import load_dataset

# Example: Load your dataset as a DatasetDict
# dataset = load_dataset('some_dataset')

subset_fraction = 0.25
subset_size = int(len(dataset["train"]) * subset_fraction)

# Subset the 'train' portion of the dataset
train_subset = dataset["train"].shuffle(seed=42).select(range(subset_size))

# Now perform the splits on the subset
train_temp = train_subset.train_test_split(test_size=0.3, seed=42)
test_valid = train_temp["test"].train_test_split(test_size=0.5, seed=42)

train_dataset = train_temp["train"]
val_dataset = test_valid["train"]
test_dataset = test_valid["test"]

print(f"Training: {len(train_dataset)}, Validation: {len(val_dataset)}, Testing: {len(test_dataset)}")


In [None]:
from datasets import Audio, DatasetDict

dataset = DatasetDict()

dataset["train"] = train_dataset
dataset["validation"] = val_dataset
dataset["test"] = test_dataset

## Pre-process the data for whisper used

In [None]:
def prepare_dataset(batch):
    try:
        audio = batch["wav"]
        features = feature_extractor(
            audio["array"],
            sampling_rate=audio["sampling_rate"]
        )
        batch["input_features"] = features.input_features[0]
        batch["labels"] = tokenizer(batch["transcript"]).input_ids
    except Exception as e:
        print(f"Error processing sample: {e}")
        batch["input_features"] = None  # Mark for filtering later
    return batch

tokenized_dataset = dataset.map(prepare_dataset)

Map:   0%|          | 0/6068 [00:00<?, ? examples/s]

Map:   0%|          | 0/1300 [00:00<?, ? examples/s]

Map:   0%|          | 0/1301 [00:00<?, ? examples/s]

In [None]:
tokenized_dataset

DatasetDict({
    train: Dataset({
        features: ['wav', '__key__', '__url__', 'transcript', 'input_features', 'labels'],
        num_rows: 6068
    })
    validation: Dataset({
        features: ['wav', '__key__', '__url__', 'transcript', 'input_features', 'labels'],
        num_rows: 1300
    })
    test: Dataset({
        features: ['wav', '__key__', '__url__', 'transcript', 'input_features', 'labels'],
        num_rows: 1301
    })
})

## Create Data Collator Function class


In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

# 定义一个针对语音到文本任务的数据整理器类
@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any  # 处理器结合了特征提取器和分词器

    # 整理器函数，将特征列表处理成一个批次
    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # 从特征列表中提取输入特征，并填充以使它们具有相同的形状
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # 从特征列表中提取标签特征（文本令牌），并进行填充
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # 使用-100替换标签中的填充区域，-100通常用于在损失计算中忽略填充令牌
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # 如果批次中的所有序列都以句子开始令牌开头，则移除它
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        # 将处理过的标签添加到批次中
        batch["labels"] = labels

        return batch  # 返回最终的批次，准备好进行训练或评估

## using Data Collator class

In [None]:
# 用给定的处理器实例化数据整理器
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

## Create Whisper Conditional generation


In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained(model_name_or_path, load_in_8bit=True,
#                                                         torch_dtype=torch.float16,
                                                        device_map="auto")

## Load PEFT library for preparing training the data

In [None]:
from peft import prepare_model_for_kbit_training

model = prepare_model_for_kbit_training (model)

## Create Lora Config

In [None]:
from peft import LoraConfig, PeftModel, LoraModel, LoraConfig, get_peft_model

# 创建一个LoraConfig对象，用于设置LoRA（Low-Rank Adaptation）的配置参数
config = LoraConfig(
    r=4,  # LoRA的秩，影响LoRA矩阵的大小
    lora_alpha=64,  # LoRA适应的比例因子
    # 指定将LoRA应用到的模型模块，通常是attention和全连接层的投影。
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,  # 在LoRA模块中使用的dropout率
    bias="none",  # 设置bias的使用方式，这里没有使用bias
)

## assign the model and config to peft model


In [None]:
peft_model = get_peft_model(model, config)

## print the lora parameters

In [None]:
# 打印 LoRA 微调训练的模型参数
peft_model.print_trainable_parameters()

In [None]:
from transformers import Seq2SeqTrainingArguments

# 设置序列到序列模型训练的参数
training_args = Seq2SeqTrainingArguments(
    output_dir=model_dir,  # 指定模型输出和保存的目录
    per_device_train_batch_size=batch_size,  # 每个设备上的训练批量大小
    learning_rate=1e-3,  # 学习率
    num_train_epochs=1,  # 训练的总轮数
    eval_strategy="epoch",  # 设置评估策略，这里是在每个epoch结束时进行评估
    # warmup_steps=50,  # 在训练初期增加学习率的步数，有助于稳定训练
    # fp16=True,  # 启用混合精度训练，可以提高训练速度，同时减少内存使用
    per_device_eval_batch_size=batch_size,  # 每个设备上的评估批量大小
    generation_max_length=128,  # 生成任务的最大长度
    logging_steps=10,  # 指定日志记录的步骤，用于跟踪训练进度
    remove_unused_columns=False,  # 是否删除不使用的列，以减少数据处理开销
    label_names=["labels"],  # 指定标签列的名称，用于训练过程中
    # evaluation_strategy="steps",
    # eval_steps=25,
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=peft_model,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["validation"],
    data_collator=data_collator,
    tokenizer=processor.feature_extractor,
)
peft_model.config.use_cache = False

In [None]:
trainer.train()

In [None]:
trainer.save_model(model_save)

In [None]:
processor.save_pretrained(model_save)

In [None]:
peft_model.eval()