In [14]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

In [None]:
import os
import json
import torch
import numpy as np
import gc
from dataclasses import dataclass
from typing import Any, Dict, List, Union
from tqdm import tqdm
from datasets import Dataset, DatasetDict, Audio
from torch.utils.data import DataLoader

# 데이터셋 불러오기 함수 (wav 파일과 대응하는 txt 또는 json 파일에서 transcript 읽기)
def load_my_dataset(folder_path):
    audio_paths = []
    transcripts = []

    for fname in sorted(os.listdir(folder_path)):
        if fname.endswith(".wav"):
            wav_path = os.path.join(folder_path, fname)
            base_name = fname.replace(".wav", "")
            txt_path = os.path.join(folder_path, base_name + ".txt")
            json_path = os.path.join(folder_path, base_name + ".json")

            # txt 파일이 있으면 우선 읽고, 없으면 json에서 읽음
            if os.path.exists(txt_path):
                with open(txt_path, "r", encoding="utf-8") as f:
                    text = f.read().strip()
            elif os.path.exists(json_path):
                with open(json_path, "r", encoding="utf-8") as f:
                    j = json.load(f)
                    text = j["06_transcription"]["1_text"]
            else:
                continue

            audio_paths.append(wav_path)
            transcripts.append(text)

    return Dataset.from_dict({
        "audio": audio_paths,
        "sentence": transcripts,
    })

# 데이터셋 로드 및 전처리
# test 데이터셋 경로 
common_voice = DatasetDict()
# common_voice["test"] = load_my_dataset("/data/seungmin/dataset/kr_univ_validation_processed/number_and_english/")
common_voice["test"] = load_my_dataset("/data/seungmin/dataset/k12_validation_processed/number_and_english/")

# 샘플링레이트 16kHz로 강제 변환
common_voice["test"] = common_voice["test"].cast_column("audio", Audio(sampling_rate=16000))

# Whisper 관련 모듈 불러오기 (원본 base 모델 경로와 언어, task 설정)
from transformers import WhisperFeatureExtractor, WhisperTokenizer, WhisperProcessor

model_name_or_path = "openai/whisper-large-v2"  # base 모델 명시
language = "korean"
task = "transcribe"

# feature extractor, tokenizer, processor 로드
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name_or_path)
tokenizer = WhisperTokenizer.from_pretrained(model_name_or_path, language=language, task=task)
processor = WhisperProcessor.from_pretrained(model_name_or_path, language=language, task=task)

# 데이터 전처리 함수: 입력 오디오로부터 input_features 생성 및 텍스트를 label id로 인코딩
def prepare_dataset(batch):
    audio = batch["audio"]
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]
    batch["labels"] = tokenizer(batch["sentence"]).input_ids
    return batch

common_voice["test"] = common_voice["test"].map(
    prepare_dataset,
    remove_columns=common_voice["test"].column_names,
    num_proc=1
)

# Data Collator: 입력과 라벨에 대해 각각 적절한 패딩 수행
@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # 오디오 입력 패딩
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")
        
        # 라벨(tokenized text) 패딩
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)
        
        # 만약 bos 토큰이 앞에 추가된 경우 제거 (모델 generate 단계에서 다시 추가됨)
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]
        batch["labels"] = labels

        return batch

data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

# 평가를 위한 DataLoader 구성 (배치 사이즈 등 필요에 따라 조정)
eval_dataloader = DataLoader(common_voice["test"], batch_size=8, collate_fn=data_collator)

# 허깅페이스에 업로드한 PEFT 모델 로드
from transformers import WhisperForConditionalGeneration, BitsAndBytesConfig
from peft import PeftModel, PeftConfig

# 업로드 시 사용한 모델 아이디 (예: handsomemin/openai-whisper-large-v2-k12-0416-LORA)
peft_model_id = "handsomemin/openai-whisper-large-v2-k12-0416-LORA-LORA"
peft_config = PeftConfig.from_pretrained(peft_model_id)

# base 모델을 8bit 로딩하고, device_map="auto"를 사용하여 GPU에 자동 매핑
model = WhisperForConditionalGeneration.from_pretrained(
    peft_config.base_model_name_or_path,
    quantization_config=BitsAndBytesConfig(load_in_8bit=True),
    device_map="auto"
)

# 저장된 adapter를 로드하여 PEFT 모델로 완성
model = PeftModel.from_pretrained(model, peft_model_id)
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []
model.eval()

# 평가 지표 준비 (wer, cer)
from evaluate import load
wer_metric = load("wer")
cer_metric = load("cer")

# 평가 루프: 생성된 결과와 정답 텍스트를 디코딩하여 평가 지표에 추가
for step, batch in enumerate(tqdm(eval_dataloader)):
    # GPU 메모리 절약과 속도를 위해 autocast 사용 (FP16)
    with torch.cuda.amp.autocast():
        with torch.no_grad():
            # decoder_input_ids는 간단히 토큰의 앞부분을 사용
            generated_tokens = model.generate(
                input_features=batch["input_features"].to("cuda"),
                decoder_input_ids=batch["labels"][:, :4].to("cuda"),
                attention_mask=torch.ones_like(batch["input_features"]).to("cuda"),
                max_new_tokens=255,
            ).cpu().numpy()

    # 라벨 처리: -100을 tokenizer.pad_token_id로 바꾸어 디코딩
    labels = batch["labels"].cpu().numpy()
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_preds = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

    wer_metric.add_batch(predictions=decoded_preds, references=decoded_labels)
    cer_metric.add_batch(predictions=decoded_preds, references=decoded_labels)

    # 메모리 정리
    del generated_tokens, labels, batch
    gc.collect()

# 평가 지표 계산 및 출력 (백분율 단위)
wer = 100 * wer_metric.compute()
cer = 100 * cer_metric.compute()
print(f"WER: {wer:.2f}%")
print(f"CER: {cer:.2f}%")


Map:   0%|          | 0/1382 [00:00<?, ? examples/s]

adapter_model.safetensors:   0%|          | 0.00/63.0M [00:00<?, ?B/s]

  with torch.cuda.amp.autocast():
100%|██████████| 173/173 [40:13<00:00, 13.95s/it]


✅ WER: 30.72%
✅ CER: 10.03%


: 