<a href="https://colab.research.google.com/github/jc020230/gc4-sand/blob/main/1103%20wavLM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install openpyxl

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd

file_path = '/content/drive/MyDrive/SAND_Challenge_task1_dataset/task1/sand_task_1.xlsx'


try:
    df_all = pd.read_excel(file_path, sheet_name='SAND - TRAINING set - Task 1')

    df_trn = pd.read_excel(file_path, sheet_name='Training Baseline - Task 1')

    df_val = pd.read_excel(file_path, sheet_name='Validation Baseline - Task 1')

    print("파일 읽기 성공!")
    print(df_all.head())

except FileNotFoundError:
    print(f"오류: 파일을 찾을 수 없습니다.")
    print(f"경로를 다시 확인해주세요: {file_path}")
except Exception as e:
    print(f"파일을 읽는 중 오류가 발생했습니다: {e}")

In [None]:
trn_ids = df_trn['ID'].tolist()
val_ids = df_val['ID'].tolist()
len(trn_ids), len(val_ids)

In [None]:
trn_folder = '/content/drive/MyDrive/SAND_Challenge_task1_dataset/task1/training'

import os
import glob
import re
from pathlib import Path

# training 폴더의 모든 wav 파일 찾기
wav_files = []
wav_info_dict = {}
wav_files_trn = []
wav_files_val = []

# rhythmPA와 rhythmTA 폴더에서 wav 파일 찾기
for subfolder in ['phonationA', 'phonationE','phonationI', 'phonationO', 'phonationU','rhythmKA','rhythmPA', 'rhythmTA']:
    folder_path = os.path.join(trn_folder, subfolder)
    if os.path.exists(folder_path):
        # glob을 사용하여 wav 파일 찾기
        pattern = os.path.join(folder_path, '*.wav')
        files = glob.glob(pattern)
        wav_files.extend(files)

print(f"총 {len(wav_files)}개의 wav 파일을 찾았습니다.")

# 파일 경로에서 ID 추출하고 라벨 정보 매칭
for file_path in wav_files:
    # 파일명에서 ID 추출 (IDxxx 형태)
    filename = os.path.basename(file_path)
    id_match = re.search(r'(ID\d+)', filename)

    if id_match:
        id_num = id_match.group(1)  # ID 번호를 정수로 변환

        # 트레인/밸리데이션 데이터셋에 따라 파일 경로 분류
        if id_num in trn_ids:
            wav_files_trn.append(file_path)
        elif id_num in val_ids:
            wav_files_val.append(file_path)

        # df_all에서 해당 ID의 정보 찾기
        matching_row = df_all[df_all['ID'] == id_num]

        ## task_type 정보 추가
        if not matching_row.empty:
            # 라벨 정보 추출
            label_info = {
                'file_path': file_path,
                'id': id_num,
                'class': matching_row['Class'].values[0],
                'age': matching_row['Age'].values[0],
                'sex': matching_row['Sex'].values[0]
            }
            wav_info_dict[file_path] = label_info
        else:
            print(f"Warning: ID {id_num}에 대한 라벨 정보를 찾을 수 없습니다. 파일: {filename}")
    else:
        print(f"Warning: 파일명에서 ID를 추출할 수 없습니다: {filename}")

print(f"\n라벨 정보가 매칭된 파일: {len(wav_info_dict)}개")



In [None]:
len(wav_files), len(wav_files_trn), len(wav_files_val)

In [None]:
wav_info_dict[wav_files[0]]

In [None]:
!pip install -q transformers scikit-learn

In [None]:
import torch
from transformers import WavLMModel, AutoFeatureExtractor
import librosa
import numpy as np
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score, classification_report
from tqdm.notebook import tqdm
import os
import warnings

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# WavLM 모델과 피처 추출기 로드 (16kHz로 학습됨)
MODEL_NAME = "microsoft/wavlm-base-plus"
TARGET_SR = 16000 # WavLM은 16kHz로 사전 학습되었습니다.

print(f"{MODEL_NAME} 모델 로드 중...")
feature_extractor = AutoFeatureExtractor.from_pretrained(MODEL_NAME)
model = WavLMModel.from_pretrained(MODEL_NAME).to(device)

# 모델을 "동결" (학습되지 않도록 평가 모드로 설정)
model.eval()
print("모델 로드 완료 및 동결.")

In [None]:
def extract_features(file_list, label_dict, target_sr):
    features = []
    labels = []

    # file_list (wav_files_trn 또는 wav_files_val)를 순회합니다.
    for file_path in tqdm(file_list, desc="특징 추출 중"):
        try:
            # 1. 오디오 로드 및 16kHz로 리샘플링
            waveform, sr = librosa.load(file_path, sr=target_sr, mono=True)

            # 2. 피처 추출기로 전처리
            inputs = feature_extractor(waveform, sampling_rate=target_sr, return_tensors="pt", padding=True)
            inputs = inputs.to(device)

            # 3. 모델 통과 (그래디언트 계산 안 함)
            with torch.no_grad():
                outputs = model(**inputs)

            # 4. 특징 벡터 집계 (평균 풀링)
            # outputs.last_hidden_state shape: (1, seq_len, 768)
            # 시간 축(dim=1)에 대해 평균을 내어 (1, 768) 형태로 만듦
            embedding = torch.mean(outputs.last_hidden_state, dim=1).squeeze().cpu().numpy()

            features.append(embedding)

            # 5. 레이블 가져오기 (기존 wav_info_dict 활용)
            label = label_dict[file_path]['class'] - 1  # 0부터 시작하도록 조정
            labels.append(label)

        except Exception as e:
            print(f"파일 처리 오류 {file_path}: {e}")

    return np.array(features), np.array(labels)

# --- 특징 추출 실행 ---
# (노트북 7번 셀까지 실행해서 wav_files_trn, wav_files_val, wav_info_dict가 메모리에 있어야 합니다)

print("Train 데이터 특징 추출 시작...")
X_train, y_train = extract_features(wav_files_trn, wav_info_dict, target_sr=TARGET_SR)

print("Validation 데이터 특징 추출 시작...")
X_val, y_val = extract_features(wav_files_val, wav_info_dict, target_sr=TARGET_SR)

print(f"\nTrain 특징 형태: {X_train.shape}, Train 레이블 형태: {y_train.shape}")
print(f"Val 특징 형태: {X_val.shape}, Val 레이블 형태: {y_val.shape}")

In [None]:
# SVM과 같은 모델을 위해 특징 스케일링
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)

print("특징 스케일링 완료.")

In [None]:
# --- 1. SVM (Support Vector Machine) 분류기 ---
print("\n--- SVM 분류기 학습 및 평가 ---")
# class_weight='balanced'는 기존 코드의 WeightedRandomSampler와 유사한 역할(불균형 처리)을 합니다.
svm_classifier = SVC(kernel='rbf', C=1.0, class_weight='balanced', random_state=42)

# 스케일링된 데이터로 학습
svm_classifier.fit(X_train_scaled, y_train)

# 검증 데이터로 평가
y_pred_svm = svm_classifier.predict(X_val_scaled)
f1_svm = f1_score(y_val, y_pred_svm, average='macro') # average='macro'가 average f1 score입니다.

print(f"SVM Macro F1 Score: {f1_svm:.4f}")
print(classification_report(y_val, y_pred_svm))


# --- 2. RandomForest 분류기 ---
print("\n--- RandomForest 분류기 학습 및 평가 ---")
rf_classifier = RandomForestClassifier(n_estimators=200, class_weight='balanced', random_state=42, n_jobs=-1)

# RandomForest는 스케일링이 필수는 아닙니다 (원본 X_train 사용)
rf_classifier.fit(X_train, y_train)

# 검증 데이터로 평가
y_pred_rf = rf_classifier.predict(X_val)
f1_rf = f1_score(y_val, y_pred_rf, average='macro')

print(f"RandomForest Macro F1 Score: {f1_rf:.4f}")
print(classification_report(y_val, y_pred_rf))

Augmentation