<a href="https://colab.research.google.com/github/jhyeon-kim/ai_study/blob/main/241107/%EC%8B%AC%ED%99%94%EA%B3%BC%EC%A0%9C/cnn_music_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 01. wandb, google drive 연결

In [1]:
# wandb 로그인
import wandb
wandb.login()  # 로그인 코드 추가

[34m[1mwandb[0m: Using wandb-core as the SDK backend. Please refer to https://wandb.me/wandb-core for more information.


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

 ··········


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


True

In [2]:
# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 02. 데이터 가져오기

In [3]:
import pandas as pd
import torchaudio
import os
from sklearn.model_selection import train_test_split


# 첫 번째 CSV 파일 경로 (MER_audio_taffc_dataset)
csv_file_path_1 = '/content/drive/MyDrive/sentiment_classification_projects_241014/music_sentiments_classification/MER_audio_taffc_dataset/songs_moods_labeled.csv'
# 두 번째 CSV 파일 경로 (OSF_IO)
csv_file_path_2 = '/content/drive/MyDrive/sentiment_classification_projects_241014/music_sentiments_classification/OSF_IO/set1_labeled.csv'

# 첫 번째 데이터셋 로드
data_1 = pd.read_csv(csv_file_path_1)
# 두 번째 데이터셋 로드
data_2 = pd.read_csv(csv_file_path_2)

# 첫 번째 데이터셋의 오디오 파일 경로 설정 함수 (MER_audio_taffc_dataset)
def get_audio_path_taffc(song, quadrant):
    return f'/content/drive/MyDrive/sentiment_classification_projects_241014/music_sentiments_classification/MER_audio_taffc_dataset/{quadrant}/{song}.mp3'

# 두 번째 데이터셋의 오디오 파일 경로 설정 함수 (OSF_IO)
def get_audio_path_osf(set, number):
    song_num_str = str(number).zfill(3)
    return f'/content/drive/MyDrive/sentiment_classification_projects_241014/music_sentiments_classification/OSF_IO/{set}/{song_num_str}.mp3'

# 첫 번째 데이터셋에 오디오 경로 추가
data_1['audio_path'] = data_1.apply(lambda row: get_audio_path_taffc(row['Song'], row['Quadrant']), axis=1)

# 두 번째 데이터셋에 오디오 경로 추가
data_2['audio_path'] = data_2.apply(lambda row: get_audio_path_osf("Set1", row['Nro']), axis=1)

# 두 데이터셋 병합
data_combined = pd.concat([data_1, data_2], ignore_index=True)


## 03. MelSpectrogram 전처리 및 데이터셋 준비

오디오 파일을 불러와서 CNN이 처리할 수 있도록 스펙트로그램으로 변환합니다.


In [4]:
import torchaudio.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torch

import random

class AudioSpectrogramDataset(Dataset):
    def __init__(self, dataframe, target_sample_rate=16000, segment_length=80000):  # 5초 = 80000 샘플
        self.data = dataframe
        self.labels = dataframe['label'].values
        self.audio_paths = dataframe['audio_path'].values
        self.mel_spectrogram = transforms.MelSpectrogram(sample_rate=target_sample_rate, n_mels=64)
        self.segment_length = segment_length  # 고정된 길이 5초
        self.segments = []
        self._prepare_segments()

    def _prepare_segments(self):
        # 각 파일을 5초 단위로 나누고 세그먼트 리스트를 만든다
        for idx, audio_path in enumerate(self.audio_paths):
            waveform, sample_rate = torchaudio.load(audio_path)
            label = self.labels[idx]
            num_segments = waveform.shape[1] // self.segment_length

            for segment_idx in range(num_segments):
                start = segment_idx * self.segment_length
                end = start + self.segment_length
                segment = waveform[:, start:end]

                # 스테레오 데이터를 모노로 변환
                if segment.shape[0] > 1:
                    segment = segment.mean(dim=0, keepdim=True)

                # 세그먼트와 레이블을 저장
                self.segments.append((segment, label))

    def __len__(self):
        return len(self.segments)

    def __getitem__(self, idx):
        segment, label = self.segments[idx]
        # MelSpectrogram 변환
        spec = self.mel_spectrogram(segment)
        return spec, label



In [5]:
import torch

def pad_spectrogram(spec, max_length):
    # spec: [1, n_mels, time]
    pad_size = max_length - spec.shape[-1]
    return torch.nn.functional.pad(spec, (0, pad_size), "constant", 0)  # 오른쪽으로 패딩

def collate_fn(batch):
    specs, labels = zip(*batch)

    # 각 배치의 가장 긴 스펙트로그램 길이 찾기
    max_length = max([spec.shape[-1] for spec in specs])

    # 패딩 적용하여 모든 스펙트로그램의 길이 맞추기
    padded_specs = [pad_spectrogram(spec, max_length) for spec in specs]

    # 텐서로 변환
    padded_specs = torch.stack(padded_specs)
    labels = torch.tensor(labels)

    return padded_specs, labels


In [6]:
# 병합된 데이터셋으로 AudioSpectrogramDataset 준비 (5초씩 조각내기)
dataset = AudioSpectrogramDataset(data_combined)

# 세그먼트를 섞기
random.shuffle(dataset.segments)

# Train/Validation Split (섞인 세그먼트로부터 분리)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)


데이터셋 만드는 것이 지나치게 오래 걸려서 저장해두기

In [7]:
dataset_save_path = "/content/drive/MyDrive/sentiment_classification_projects_241014/music_sentiments_classification/audio_spectrogram_dataset.pt"

# 데이터셋 저장
torch.save(dataset.segments, dataset_save_path)
print(f"Dataset segments saved to {dataset_save_path}")

Dataset segments saved to /content/drive/MyDrive/sentiment_classification_projects_241014/music_sentiments_classification/audio_spectrogram_dataset.pt


In [8]:
from collections import Counter

# Train/Validation Dataset 크기 출력
print(f"Train dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")

# Train 데이터의 레이블 분포 출력
train_labels = [label for _, label in train_dataset]
train_label_count = Counter(train_labels)
print(f"Train label distribution: {train_label_count}")

# Validation 데이터의 레이블 분포 출력
val_labels = [label for _, label in val_dataset]
val_label_count = Counter(val_labels)
print(f"Validation label distribution: {val_label_count}")


Train dataset size: 8920
Validation dataset size: 2231
Train label distribution: Counter({2: 3395, 0: 2478, 1: 1150, 4: 958, 3: 939})
Validation label distribution: Counter({2: 866, 0: 612, 1: 277, 4: 243, 3: 233})


## 04. CNN 모델 초기화

In [9]:
import torch.nn as nn
import torch.nn.functional as F
class CNNEmotionClassifier(nn.Module):
    def __init__(self, num_classes=5):  # 감정 분류 클래스 수
        super(CNNEmotionClassifier, self).__init__()

        # CNN Layer
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.dropout = nn.Dropout(0.5)

        # Fully connected layer
        self.fc1 = nn.Linear(128 * 8 * 50, 256)  # 128 * 8 * 50 = 51200
        self.fc2 = nn.Linear(256, num_classes)  # 감정 분류 레이블 수

    def forward(self, x):
        # 입력 데이터 크기: [batch_size, 1, 64, time]
        x = self.pool(F.relu(self.conv1(x)))  # (N, 32, 64, time/2)
        x = self.pool(F.relu(self.conv2(x)))  # (N, 64, 32, time/4)
        x = self.pool(F.relu(self.conv3(x)))  # (N, 128, 16, time/8)

        # CNN 출력 후 크기 출력
        # print(f"After conv3: {x.shape}")

        # Flatten: Conv 레이어의 출력을 1D로 변환
        x = x.view(x.size(0), -1)  # Flatten (N, 128 * 8 * 50 = 51200)
        # print(f"After flatten: {x.shape}")

        # Fully connected layer
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


## 05. 학습

In [10]:
import os
import torch.optim as optim

wandb.init(project="music-sentiment-classification", name="cnn_model")

# 모델 초기화
model = CNNEmotionClassifier(num_classes=5)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# GPU가 사용 가능하면 GPU로 모델 이동
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 모델을 저장할 경로 설정
model_save_path = "/content/drive/MyDrive/sentiment_classification_projects_241014/music_sentiments_classification/sentiment_classification_cnn_model_wandb.pth"

best_val_accuracy = 0.0
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in train_loader:
        inputs, labels = batch
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    wandb.log({"train/loss": avg_train_loss})

    # 검증 단계
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in val_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = 100 * correct / total
    wandb.log({"eval/loss": avg_val_loss, "val_accuracy": val_accuracy})

    # 현재 에폭에서의 validation accuracy가 최고일 경우 모델 저장
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), model_save_path)

print(f"Training completed. Best validation accuracy: {best_val_accuracy}")
wandb.finish()


[34m[1mwandb[0m: Currently logged in as: [33mskjh0807s[0m ([33mskjh0807s-hanghae99[0m). Use [1m`wandb login --relogin`[0m to force relogin


VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.01111256232222028, max=1.0)…

Training completed. Best validation accuracy: 46.122814881219185


VBox(children=(Label(value='0.012 MB of 0.012 MB uploaded\r'), FloatProgress(value=1.0, max=1.0)))

0,1
eval/loss,▁▁▁▂▂▃▅▆▇█
train/loss,█▅▅▄▃▂▂▂▁▁
val_accuracy,▆█▇▇▇▁▄▅██

0,1
eval/loss,3.36971
train/loss,0.32493
val_accuracy,46.07799
