In [None]:
!pip install datasets==2.20.0
!pip install git+https://github.com/huggingface/transformers
!pip install librosa
!pip install evaluate>=0.30
!pip install jiwer
!pip install --upgrade pip
!pip install --upgrade git+https://github.com/huggingface/transformers.git accelerate datasets[audio]
!pip install git+https://github.com/haven-jeon/PyKoSpacing.git

In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
import glob
import os
import pandas as pd

audio_folders = [
    "/YOUR_CEREBRAL_PALSY_UTTERANCE_AUDIO_DATASET",
    "/YOUR_PERIPHERAL_NEUROPATHY_UTTERANCE_AUDIO_DATASET",
    "/YOUR_STROKE_UTTERANCE_AUDIO_DATASET"
]

text_folders = [
    "/YOUR_CEREBRAL_PALSY_UTTERANCE_TEXT_DATASET",
    "/YOUR_PERIPHERAL_NEUROPATHY_UTTERANCE_TEXT_DATASET",
    "/YOUR_STROKE_UTTERANCE_TEXT_DATASET"
]

all_audio_files = []
all_text_files = []

for audio_folder in audio_folders:
    wav_files = sorted(glob.glob(os.path.join(audio_folder, "*.wav")))
    all_audio_files.extend(wav_files)

for text_folder in text_folders:
    txt_files = sorted(glob.glob(os.path.join(text_folder, "*.txt")))
    all_text_files.extend(txt_files)

print(f"Audio File: {len(all_audio_files)}")
print(f"Text File: {len(all_text_files)}")

In [None]:
import pandas as pd
import os

csv_path = '/YOUR_ROOT/fluency_analysis.csv'
df = pd.read_csv(csv_path).dropna(subset=['File_ids', 'ID_num'])
df['basename'] = df['File_ids'].apply(lambda x: os.path.splitext(os.path.basename(x))[0])

def filter_included_files(file_list, include_basenames):
    return [f for f in file_list if os.path.splitext(os.path.basename(f))[0] in include_basenames]

include_basenames = df['basename'].tolist()
filtered_audio_list = filter_included_files(all_audio_files, include_basenames)
filtered_text_list = filter_included_files(all_text_files, include_basenames)

print("Audio File:", len(all_audio_files))
print("Filtered Audio File:", len(filtered_audio_list))
print("Text File:", len(all_text_files))
print("Filtered Text File:", len(filtered_text_list))

In [None]:
from transformers import WhisperProcessor, WhisperForConditionalGeneration

model_path = "YOUR_HUGGINGFACE_MODEL"
# If you want to use Provided model, use this code
# model_path = "yoona-J/ASR_Whisper_Disease_General"
processor = WhisperProcessor.from_pretrained(model_path)
model = WhisperForConditionalGeneration.from_pretrained(model_path)

forced_decoder_ids = processor.get_decoder_prompt_ids(language="ko", task="transcribe")

In [None]:
pip install nlptutti

In [None]:
import os
import re
import pandas as pd
import torchaudio
from tqdm import tqdm
from nlptutti.asr_metrics import get_wer, get_cer, _measure_cer

def transcribe(audio_path):
    speech_array, sampling_rate = torchaudio.load(audio_path)
    if speech_array.shape[0] > 1:
        speech_array = speech_array.mean(dim=0, keepdim=True)
    if sampling_rate != 16000:
        speech_array = torchaudio.functional.resample(speech_array, orig_freq=sampling_rate, new_freq=16000)
    input_features = processor(speech_array.squeeze(), sampling_rate=16000, return_tensors="pt").input_features
    predicted_ids = model.generate(
        input_features,
        forced_decoder_ids=forced_decoder_ids
    )
    transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
    return transcription

records = []
for audio_path, ref_path in tqdm(zip(filtered_audio_list, filtered_text_list), total=len(filtered_audio_list)):
    basename = os.path.splitext(os.path.basename(audio_path))[0]
    matched_row = df[df['basename'] == basename]
    if matched_row.empty:
        continue
    id_num = matched_row['ID_num'].values[0]

    pred_text = transcribe(audio_path)
    with open(ref_path, 'r', encoding='utf-8') as f:
        ref_text = f.read().strip()

    cer_result = get_cer(ref_text, pred_text, rm_punctuation=True)
    cer = cer_result["cer"] if isinstance(cer_result, dict) else cer_result

    wer_result = get_wer(ref_text, pred_text, rm_punctuation=True)
    wer = wer_result["wer"] if isinstance(wer_result, dict) else wer_result

    records.append({
        "speaker_id": id_num,
        "basename": basename,
        "label": ref_text,
        "prediction": pred_text,
        "cer": cer,
        "wer": wer,
    })

df_pred = pd.DataFrame(records)

def remove_punctuation(text):
    return re.sub(r'[^\w\s가-힣]', '', text)

def extract_error_metrics(row):
    ref = remove_punctuation(row['label'].replace(" ", ""))
    hyp = remove_punctuation(row['prediction'].replace(" ", ""))

    substitutions, deletions, insertions = _measure_cer(ref, hyp)

    return pd.Series({
        "substitutions": substitutions,
        "deletions": deletions,
        "insertions": insertions,
        "ref_length": len(ref)
    })

df_error = df_pred.apply(extract_error_metrics, axis=1)
df_pred = pd.concat([df_pred, df_error], axis=1)

person_summary = df_pred.groupby("speaker_id").agg({
    "cer": "mean",
    "wer": "mean",
    "substitutions": "sum",
    "deletions": "sum",
    "insertions": "sum",
    "ref_length": "sum"
}).reset_index()

person_summary["sub_rate"] = person_summary["substitutions"] / person_summary["ref_length"]
person_summary["del_rate"] = person_summary["deletions"] / person_summary["ref_length"]
person_summary["ins_rate"] = person_summary["insertions"] / person_summary["ref_length"]

# PER Calculator
w_s = 0.4  # Substitution
w_d = 0.4  # Deletion
w_i = 0.2  # Insertion

person_summary["per"] = (
    w_s * person_summary["sub_rate"] +
    w_d * person_summary["del_rate"] +
    w_i * person_summary["ins_rate"]
)

In [None]:
person_summary["speaker_id_num"] = person_summary["speaker_id"].str.extract(r'(\d+)').astype(int)

person_summary_sorted = person_summary.sort_values(by="speaker_id_num")

person_summary_sorted[["speaker_id", "per", "ref_length", "sub_rate", "del_rate", "ins_rate", "cer", "wer"]].head()

In [None]:
person_summary_sorted["per"] = (person_summary_sorted["per"] * 100).round(2)
person_summary_sorted["cer"] = (person_summary_sorted["cer"] * 100).round(2)
person_summary_sorted["wer"] = (person_summary_sorted["wer"] * 100).round(2)

person_summary_sorted

In [None]:
df_pred.to_csv("GeneralModel_results.csv", index=False)

person_summary_sorted.to_csv("GeneralModel_error.csv", index=False)