In [45]:
import torchaudio
from torchaudio.pipelines import HUBERT_BASE
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
import os
import json
import soundfile as sf

In [46]:
# 1. HuBERT 특성 추출기 정의
class HuBERTFeatureExtractor:
    def __init__(self):
        # torchaudio에서 HuBERT BASE 모델 로드
        self.hubert = HUBERT_BASE.get_model()

    def extract_features(self, audio_file):
        # 오디오 파일 로드
        waveform, sample_rate = sf.read(audio_file)
        waveform = torch.tensor(waveform, dtype=torch.float32)
        
        MAX_DURATION = 20.0  # 최대 길이(초)
        
        # waveform이 1D인 경우 2D로 변환
        if waveform.ndimension() == 1:
            waveform = waveform.unsqueeze(0) # 배치 차원 추가 (1, time)
        
        # stereo to mono 변환 (채널 수가 2일 경우)
        if waveform.shape[0] == 2:
            waveform = waveform.mean(dim=0)
        
        target_length = int(MAX_DURATION * sample_rate)
        # 길이가 너무 길면 자르기
        if waveform.shape[1] > target_length:
            waveform = waveform[:, :target_length]
        # 길이가 짧을 경우 0으로 패딩
        elif waveform.shape[1] < target_length:
            padding = int(target_length - waveform.shape[1])
            waveform = torch.nn.functional.pad(waveform, (0, padding))
        
        # HuBERT 모델을 사용하여 특성 벡터 추출
        with torch.no_grad():
            features, _ = self.hubert(waveform)
        return features

In [47]:
# 2. Transformer 모델 정의 (감정 인식용)
class EmotionTransformer(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(EmotionTransformer, self).__init__()
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=input_dim, nhead=8),
            num_layers=6
        )
        self.fc = nn.Linear(input_dim, num_classes)

    def forward(self, x):
        # Transformer를 통해 특성 추출
        x = self.transformer(x)
        x = x.mean(dim=1)  # Sequence의 평균을 사용
        output = self.fc(x)
        return output

In [48]:
# 3. 데이터셋 클래스 정의
class EmotionDataset(Dataset):
    def __init__(self, audio_files, labels, feature_extractor):
        self.audio_files = audio_files
        self.labels = labels
        self.feature_extractor = feature_extractor
        self.label_encoder = LabelEncoder()
        self.label_encoder.fit(labels)

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        audio_file = self.audio_files[idx]
        label = self.labels[idx]
        
        # HuBERT로 음성 특성 추출
        features = self.feature_extractor.extract_features(audio_file)
        
        # 라벨 인코딩
        label = self.label_encoder.transform([label])[0]
        
        return features.squeeze(0), label

In [58]:
# 4. 모델 학습 함수
def train_model(train_dataloader, model, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        for features, labels in train_dataloader:
            # 모델에 입력
            features = features.permute(1, 0, 2)  # (seq_len, batch, features), Transformer의 입력 차원에 맞게 차원 변환
            labels = labels.long()
            
            optimizer.zero_grad() # 경사도 초기화
            
            # 예측
            outputs = model(features)
            
            print(f"outputs.shape: {outputs.shape}")
            
            outputs = outputs[-1, :]
            
            # 손실 계산
            loss = criterion(outputs, labels)
            
            # 역전파
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

In [50]:
# Collate function을 사용하여 padding 적용
def collate_fn(batch):
    try:
        features, labels = zip(*batch)

        # 각 시퀀스의 길이를 맞추기 위해 padding
        features_padded = pad_sequence(features, batch_first=True, padding_value=0)

        # 라벨은 그대로 반환
        labels = torch.tensor(labels, dtype=torch.long)

        # 배치 크기가 일치하는지 확인
        if features_padded.size(0) != labels.size(0):
            raise ValueError("오류")

        return features_padded, labels
    except Exception as e:
        print(f"Error in collate_fn: {e}")
        return None, None

In [59]:
# 5. 학습 데이터 준비
audio_dir = "./dataset/New_Sample/원천데이터/0018_G2A3E4S0C0_JBR_" # 오디오 파일이 저장된 폴더 경로
json_dir = "./dataset/New_Sample/라벨링데이터/0018_G2A3E4S0C0_JBR_"  # 각 오디오의 감정 레이블 폴더 경로

audio_files = []
labels = []

for i in range(1, 2001):
    audio_path = audio_dir + str(i).zfill(6) + ".wav"
    json_path = json_dir + str(i).zfill(6) + ".json"
    
    if os.path.exists(json_path):
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)

            emotion = data['화자정보']['Emotion']
            
            audio_files.append(audio_path)
            labels.append(emotion)

# 6. 모델 학습 준비
feature_extractor = HuBERTFeatureExtractor()
dataset = EmotionDataset(audio_files, labels, feature_extractor)
train_dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

# for batch_idx, (features, labels) in enumerate(train_dataloader):
#     print(f"Batch {batch_idx + 1}:")
#     print(f"Features shape: {features.shape}")
#     print(f"Labels shape: {labels.shape}")
#     print(f"Labels: {labels}")

# 모델, 손실 함수, 최적화 함수 정의
model = EmotionTransformer(input_dim=768, num_classes=len(set(labels)))  # HuBERT의 출력 차원은 768
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# 모델 학습
train_model(train_dataloader, model, criterion, optimizer)



outputs.shape: torch.Size([2999, 1])


RuntimeError: size mismatch (got input: [1], target: [2])