In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install git+https://github.com/openai/whisper.git
!pip install --upgrade jiwer evaluate

In [None]:
import numpy as np
import pandas as pd
import os, sys, re, math, random, time, json, pickle, gc
from tqdm import tqdm
from collections import defaultdict

import torch
import torch.nn as nn
import torch.optim as optim # 안 될 확률 높음
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence

import librosa, evaluate, jiwer
from typing import Any, Dict, List, Union
from transformers import WhisperModel, WhisperFeatureExtractor, WhisperTokenizer, WhisperProcessor

import whisper

from sklearn.metrics import f1_score, accuracy_score
from sklearn.model_selection import train_test_split

# csv는 파일 경로(audio_file_path), classId(class를 숫자로 바꾼 거), class(한국어로 된 거)
# 각각 경로 추가하기

df1 = pd.read_csv('/content/drive/MyDrive/코딩/새싹해커톤/audio_cls_dataset.csv')
df2 = pd.read_csv('/content/drive/MyDrive/코딩/새싹해커톤/normal_audio_dataset.csv')

combined_df = pd.concat([df1, df2], ignore_index=True)
combined_df = combined_df.sample(frac=1, random_state=42).reset_index(drop=True)
# combined_df.to_csv("/content/drive/MyDrive/코딩/새싹해커톤/combined_audio_dataset.csv", index=False, encoding='utf-8-sig')

train_df, temp_df = train_test_split(
    combined_df,
    train_size=int(len(combined_df) * 0.7),
    random_state=42,
    stratify=combined_df['classId']  # 클래스 비율 유지
)

temp_len = len(temp_df)
valid_df, test_df = train_test_split(
    temp_df,
    train_size=int(len(combined_df) * 0.2),
    random_state=42,
    stratify=temp_df['classId']  # 클래스 비율 유지
)

train_df.reset_index(drop = True, inplace = True)
valid_df.reset_index(drop = True, inplace = True)
test_df.reset_index(drop = True, inplace = True)

class args:
    # amp = True
    gpu = '0'

    label_size = 3
    epochs=10
    start_epoch = 0
    batch_size=8
    weight_decay=1e-6
    max_len = 60

    start_lr = 2e-5
    min_lr=1e-6

    num_workers=0
    seed=2022

    path = '/content/drive/MyDrive/코딩/새싹해커톤'
    save_model_path = '/whisper_tiny_cls.pt'

os.environ["CUDA_VISIBLE_DEVICES"] = args.gpu
device = torch.device(f"cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
# train_df = train_df[:100]
# valid_df = valid_df[:20]

In [None]:
# print(len(train_df), len(valid_df), len(test_df))

919 262 133


In [None]:
model_name = 'openai/whisper-tiny'
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name)
processor = WhisperProcessor.from_pretrained(model_name, language='Korean')
tokenizer = WhisperTokenizer.from_pretrained(model_name, language='Korean')
whisper_model = whisper.load_model(model_name.split('-')[-1])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, df, feature_extractor, tokenizer, processor, seed):
        self.df = df
        self.feature_extractor = feature_extractor
        self.tokenizer = tokenizer
        self.processor = processor
        self.seed = seed

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        file_path = self.df.iloc[idx]['audio_file_path']
        audio, _ = librosa.load(file_path, sr=16000)
        audio = whisper.pad_or_trim(audio.flatten())
        mel = whisper.log_mel_spectrogram(audio)

        input_features = torch.tensor(mel, dtype=torch.float32)
        decoder_input_ids = torch.tensor(self.tokenizer.bos_token_id, dtype=torch.long)

        labels = self.df.iloc[idx]['classId']
        labels = torch.tensor(labels, dtype=torch.long)

        return {"input_features": input_features, "labels": labels, "decoder_input_ids": decoder_input_ids}

train_dataset = CustomDataset(train_df, feature_extractor, tokenizer, processor, seed=42)
train_loader = DataLoader(train_dataset, batch_size = args.batch_size, shuffle=True)
valid_dataset = CustomDataset(valid_df, feature_extractor, tokenizer, processor, seed=42)
valid_loader = DataLoader(valid_dataset, batch_size = args.batch_size, shuffle=True)

In [None]:
whisper_model = whisper.load_model(model_name.split('-')[-1])
optimizer = torch.optim.AdamW(whisper_model.parameters(), lr=args.start_lr)
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=-100)

In [None]:
def train(model, data_loader, loss_fn, opimizer, tokenizer):
    model.to(device)
    model.train()
    pred_list = []
    target_list = []
    pbar = tqdm(data_loader)
    train_loss = 0
    cer_score = 0

    for i, batch in enumerate(pbar):
        input_features = batch['input_features'].to(device)
        labels = batch['labels'].long().to(device)
        # decoder_input_ids = batch['decoder_input_ids'].long().to(device)
        decoder_input_ids = batch['decoder_input_ids'].long().to(device).unsqueeze(1)
        # print(decoder_input_ids.shape)

        optimizer.zero_grad()

        audio_features = model.encoder(input_features).to(device)
        outputs = model.decoder(decoder_input_ids, audio_features)

        loss = loss_fn(outputs.view(-1, outputs.size(-1)), labels.view(-1))
        loss.backward()

        opimizer.step()
        train_loss += loss.item()
        pbar.set_description('\033[1m[C_loss : {:>.5}]\033[0m'.format(round(train_loss / (i+1), 4)))

        pred = torch.argmax(outputs, dim=-1)
        pred_list.extend(pred.cpu().numpy().tolist())
        target_list.extend(labels.cpu().numpy().tolist())

    train_loss = train_loss / len(data_loader)

    pred_list = [p for batch_preds in pred_list for p in (batch_preds if isinstance(batch_preds, list) else [batch_preds])]
    target_list = [t for batch_targets in target_list for t in (batch_targets if isinstance(batch_targets, list) else [batch_targets])]

    # Accuracy & F1 계산
    acc = accuracy_score(target_list, pred_list)
    f1 = f1_score(target_list, pred_list, average='macro')  # 다중클래스면 macro
    print("\033[1m[ACC    : {:>.5}]\033[0m".format(round(acc, 4)))
    print("\033[1m[F1     : {:>.5}]\033[0m".format(round(f1, 4)))

    torch.cuda.empty_cache()
    gc.collect()

    return model, train_loss / len(data_loader), acc, f1

def valid(model, data_loader, loss_fn, tokenizer):
    model.to(device)
    model.eval()
    pred_list = []
    target_list = []
    pbar = tqdm(data_loader)
    valid_loss = 0
    cer_score = 0

    for i, batch in enumerate(pbar):
        input_features = batch['input_features'].to(device)
        labels = batch['labels'].long().to(device)
        # decoder_input_ids = batch['decoder_input_ids'].long().to(device)
        decoder_input_ids = batch['decoder_input_ids'].long().to(device).unsqueeze(1)

        audio_features = model.encoder(input_features)
        outputs = model.decoder(decoder_input_ids, audio_features)

        loss = loss_fn(outputs.view(-1, outputs.size(-1)), labels.view(-1))
        valid_loss += loss.item()
        pbar.set_description('\033[1m[C_loss : {:>.5}]\033[0m'.format(round(valid_loss / (i+1), 4)))

        pred = torch.argmax(outputs, dim=-1)
        pred_list.extend(pred.cpu().numpy().tolist())
        target_list.extend(labels.cpu().numpy().tolist())

    valid_loss = valid_loss / len(data_loader)

    pred_list = [p for batch_preds in pred_list for p in (batch_preds if isinstance(batch_preds, list) else [batch_preds])]
    target_list = [t for batch_targets in target_list for t in (batch_targets if isinstance(batch_targets, list) else [batch_targets])]

    # Accuracy & F1 계산
    acc = accuracy_score(target_list, pred_list)
    f1 = f1_score(target_list, pred_list, average='macro')  # 다중클래스면 macro
    print("\033[1m[ACC    : {:>.5}]\033[0m".format(round(acc, 4)))
    print("\033[1m[F1     : {:>.5}]\033[0m".format(round(f1, 4)))

    torch.cuda.empty_cache()
    gc.collect()

    return model, valid_loss / len(data_loader), acc, f1

In [None]:
save_loss = 100

for epoch in range(args.epochs):
    if epoch < args.start_epoch:
        continue

    model, train_loss, train_acc, train_f1 = train(whisper_model, train_loader, loss_fn, optimizer, tokenizer)
    model, valid_loss, valid_acc, valid_f1 = valid(whisper_model, valid_loader, loss_fn, tokenizer)

    if valid_loss < save_loss:
        save_loss = valid_loss
        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
        }, args.path + args.save_model_path)

    torch.cuda.empty_cache()
    gc.collect()

# 4에폭 결과
'''
  0%|          | 0/115 [00:00<?, ?it/s]/tmp/ipython-input-1114513740.py:18: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.detach().clone() or sourceTensor.detach().clone().requires_grad_(True), rather than torch.tensor(sourceTensor).
  input_features = torch.tensor(mel, dtype=torch.float32)
[C_loss : 0.6985]: 100%|██████████| 115/115 [40:55<00:00, 21.35s/it]
[ACC    : 0.7334]
[F1     : 0.4079]
  0%|          | 0/33 [00:00<?, ?it/s]/tmp/ipython-input-1114513740.py:18: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.detach().clone() or sourceTensor.detach().clone().requires_grad_(True), rather than torch.tensor(sourceTensor).
  input_features = torch.tensor(mel, dtype=torch.float32)
[C_loss : 0.3778]: 100%|██████████| 33/33 [12:44<00:00, 23.16s/it]
[ACC    : 0.8015]
[F1     : 0.663]
  0%|          | 0/115 [00:00<?, ?it/s]/tmp/ipython-input-1114513740.py:18: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.detach().clone() or sourceTensor.detach().clone().requires_grad_(True), rather than torch.tensor(sourceTensor).
  input_features = torch.tensor(mel, dtype=torch.float32)
[C_loss : 0.3788]: 100%|██████████| 115/115 [33:23<00:00, 17.42s/it]
[ACC    : 0.8313]
[F1     : 0.79]
  0%|          | 0/33 [00:00<?, ?it/s]/tmp/ipython-input-1114513740.py:18: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.detach().clone() or sourceTensor.detach().clone().requires_grad_(True), rather than torch.tensor(sourceTensor).
  input_features = torch.tensor(mel, dtype=torch.float32)
[C_loss : 0.3072]: 100%|██████████| 33/33 [10:16<00:00, 18.68s/it]
[ACC    : 0.8511]
[F1     : 0.8323]
  0%|          | 0/115 [00:00<?, ?it/s]/tmp/ipython-input-1114513740.py:18: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.detach().clone() or sourceTensor.detach().clone().requires_grad_(True), rather than torch.tensor(sourceTensor).
  input_features = torch.tensor(mel, dtype=torch.float32)
[C_loss : 0.1996]: 100%|██████████| 115/115 [32:07<00:00, 16.76s/it]
[ACC    : 0.8966]
[F1     : 0.8756]
  0%|          | 0/33 [00:00<?, ?it/s]/tmp/ipython-input-1114513740.py:18: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.detach().clone() or sourceTensor.detach().clone().requires_grad_(True), rather than torch.tensor(sourceTensor).
  input_features = torch.tensor(mel, dtype=torch.float32)
[C_loss : 0.3297]: 100%|██████████| 33/33 [09:20<00:00, 16.97s/it]
[ACC    : 0.8473]
[F1     : 0.835]
  0%|          | 0/115 [00:00<?, ?it/s]/tmp/ipython-input-1114513740.py:18: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.detach().clone() or sourceTensor.detach().clone().requires_grad_(True), rather than torch.tensor(sourceTensor).
  input_features = torch.tensor(mel, dtype=torch.float32)
[C_loss : 0.1186]: 100%|██████████| 115/115 [31:29<00:00, 16.43s/it]
[ACC    : 0.9402]
[F1     : 0.9296]
  0%|          | 0/33 [00:00<?, ?it/s]/tmp/ipython-input-1114513740.py:18: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.detach().clone() or sourceTensor.detach().clone().requires_grad_(True), rather than torch.tensor(sourceTensor).
  input_features = torch.tensor(mel, dtype=torch.float32)
[C_loss : 0.2733]: 100%|██████████| 33/33 [09:56<00:00, 18.07s/it]
[ACC    : 0.8893]
[F1     : 0.8598]
'''

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = whisper.load_model("tiny").to(device)
ckpt = torch.load("/content/drive/MyDrive/코딩/새싹해커톤/whisper_tiny_cls.pt", map_location=device)
model.load_state_dict(ckpt["model_state_dict"], strict=False)

model.eval()

In [None]:
def classify_audio(model, audio_path):
    audio, _ = librosa.load(audio_path, sr=16000)
    audio = whisper.pad_or_trim(audio)
    mel = whisper.log_mel_spectrogram(audio).to(device)

    audio_features = model.encoder(mel.unsqueeze(0))

    bos = tokenizer.bos_token_id
    decoder_input_ids = torch.tensor([[bos]], device=device)

    with torch.no_grad():
        logits = model.decoder(decoder_input_ids, audio_features)

    # vocab 51864개 중 앞 3개만 선택
    logits3 = logits[:, -1, :3]          # (1,3)
    probs = F.softmax(logits3, dim=-1)   # (1,3)

    return probs.detach().cpu().numpy().flatten()

In [None]:
results = []

for idx, row in test_df.iterrows():
    path = row["audio_file_path"]

    probs = classify_audio(model, path)
    formatted_probs = [round(float(p), 4) for p in probs]
    percent_probs = [round(float(p) * 100, 2) for p in formatted_probs]
    print(formatted_probs)
    print(percent_probs)
    pred = int(np.argmax(probs))

    results.append({
        "audio_file_path": path,
        "p0": percent_probs[0],
        "p1": percent_probs[1],
        "p2": percent_probs[2],
        "pred": pred
    })
    print(pred)

test_results = pd.DataFrame(results)
test_results