In [None]:
# download
!wget https://github.com/googly-mingto/ML2023HW4/releases/download/data/Dataset.tar.gz.partaa
!wget https://github.com/googly-mingto/ML2023HW4/releases/download/data/Dataset.tar.gz.partab
!wget https://github.com/googly-mingto/ML2023HW4/releases/download/data/Dataset.tar.gz.partac
!wget https://github.com/googly-mingto/ML2023HW4/releases/download/data/Dataset.tar.gz.partad

# merge into complete file
!cat Dataset.tar.gz.part* > Dataset.tar.gz

# !rm Dataset.tar.gz
!rm Dataset.tar.gz.partaa
!rm Dataset.tar.gz.partab
!rm Dataset.tar.gz.partac
!rm Dataset.tar.gz.partad

# unzip the file
!tar zxf Dataset.tar.gz

In [None]:
!tar zxf Dataset.tar.gz

In [None]:
import numpy as np
import torch
import random

def set_seed(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

set_seed(7895)

# Dataset

In [None]:
import os
import json
import torch
import random
from pathlib import Path
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence

class myDataset(Dataset):
    def __init__(self, data_dir, segment_len=128):
        self.data_dir = data_dir
        self.segment_len = segment_len

        # pathlib 提供的路徑寫法，建立speaker, id對應關係
        mapping_path = Path(data_dir) / 'mapping.json'
        mapping = json.load(mapping_path.open())
        self.speaker2id = mapping['speaker2id']

        metadata_path = Path(data_dir) / 'metadata.json'
        metadata = json.load(open(metadata_path))['speakers']

        self.speaker_num = len(metadata.keys())
        self.data = []
        for speaker in metadata.keys():
            for utterances in metadata[speaker]:
                self.data.append([utterances['feature_path'], self.speaker2id[speaker]])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        feat_path, speaker = self.data[index]
        mel = torch.load(os.path.join(self.data_dir, feat_path))

        if len(mel) > self.segment_len:
            # 隨機選取起點，控制長度在128
            start = random.randint(0, len(mel) - self.segment_len)
            mel = torch.FloatTensor(mel[start:start+self.segment_len])
        else:
            # 可能短於128，之後進行padding
            mel = torch.FloatTensor(mel)
        # .long()將label轉換成int64，以進行分類任務
        speaker = torch.FloatTensor([speaker]).long()
        return mel, speaker

    def get_speaker_number(self):
        return self.speaker_num



# Dataloader

In [None]:
import torch
from torch.utils.data import DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence

def collate_batch(batch):
    # unzip batch data into two tuples, mel = (mel1, mel2, mel3) speaker = (speaker1, speaker2, speaker3)
    mel, speaker = zip(*batch)
    # padding process
    mel = pad_sequence(mel, batch_first=True, padding_value=-20)
    return mel, torch.FloatTensor(speaker).long()

def get_dataloader(data_dir, batch_size, n_workers):
    dataset = myDataset(data_dir)
    speaker_num = dataset.get_speaker_number()
    trainlen = int(0.9 * len(dataset))
    lengths = [trainlen, len(dataset) - trainlen]
    trainset, validset = random_split(dataset, lengths)

    train_loader = DataLoader(trainset,
                             batch_size=batch_size,
                             shuffle=True,
                             drop_last=True,
                             num_workers=n_workers,
                             pin_memory=True,
                             collate_fn=collate_batch)

    valid_loader = DataLoader(validset,
                             batch_size=batch_size,
                             num_workers=n_workers,
                             drop_last=True,
                             pin_memory=True,
                             collate_fn=collate_batch)

    return train_loader, valid_loader, speaker_num

# Model

In [None]:

import torch
import torch.nn as nn
import torch.nn.functional as F
from conformer import ConformerBlock

class Classifier(nn.Module):
    def __init__(self, d_model=160, n_spks=600, dropout=0.2):
        super().__init__()
        #
        self.prenet = nn.Linear(40, d_model)


        self.encoder = ConformerBlock(
            dim=d_model,           # 輸入特徵維度
            dim_head=4,            # 多頭注意力頭數
            ff_mult=4,            # FeedForward layer 的維度
            conv_expansion_factor=2, # 卷積乘數
            conv_kernel_size=25, # 卷積的 kernel 大小 (15-31 odd val recommended)
            attn_dropout=dropout, #attendsion dropout
            ff_dropout=dropout, #feed forward dropout
            conv_dropout=dropout # conv dropout
        )

        # 輸出成n_spks維度，進行分類
        self.pred_layer = nn.Sequential(
            nn.BatchNorm1d(d_model),
            nn.Linear(d_model, d_model),
            nn.ReLU(),
            nn.Linear(d_model, n_spks)
            )

    def forward(self, mels):
        # batch_size, length, d_model
        out = self.prenet(mels)
        # length, batch_size, d_model
        out = out.permute(1, 0, 2)
        # feed into encoder 要用encoder就需要變形
        out = self.encoder(out)
        # 轉回 batch_size, length, d_model
        out = out.transpose(0, 1)
        # 將 'length'進行Pooling壓縮成一個向量 -->  [batch, d_model]
        stats = out.mean(dim=1)
        #
        out = self.pred_layer(stats)
        return out

# Dynamic Learning Rate

In [None]:
import math

import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR

def get_cosine_schedule_with_warmup(
    optimizer: Optimizer,
    num_warmup_steps: int,
    num_training_steps: int,
    num_cycles: float=0.5,
    last_epoch: int=-1
    ):

    def lr_lambda(current_step):
        # Warmup
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        # Decay
        progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps))
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)))

    return LambdaLR(optimizer, lr_lambda, last_epoch)

# Training

In [None]:
import torch

def model_fn(batch, model, criterion, device):
    # fetch data and move into GPU
    mels, labels = batch
    mels = mels.to(device)
    labels = labels.to(device)

    # 預測機率分數
    outs = model(mels)

    # 求loss
    loss = criterion(outs, labels)

    # 求最高分數位置，並計算準確率
    preds = outs.argmax(1)
    accuracy = torch.mean((preds == labels).float())

    return loss, accuracy

In [None]:
# Validation set
from tqdm import tqdm
import torch

def valid(dataloader, model, criterion, device):

    model.eval()
    running_loss = 0.0
    running_accuracy = 0.0

    pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc='valid', unit='uttr')

    for i, batch in enumerate(dataloader):
        with torch.no_grad():
            loss, accuracy = model_fn(batch, model, criterion, device)
            running_loss += loss.item()
            running_accuracy += accuracy.item()

        pbar.update(dataloader.batch_size)
        pbar.set_postfix(
            loss = f'{running_loss / (i+1):.2f}',
            accuracy = f'{running_accuracy / (i+1):.2f}')

    pbar.close()
    model.train()

    return running_accuracy / len(dataloader)

In [None]:
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, random_split

def parse_args():
    config = {
        'data_dir':'/content/Dataset',
        'save_path':'model.ckpt',
        'batch_size':32,
        'n_workers':8,
        'valid_steps':2000,
        'warmup_steps':1000,
        'save_steps':10000,
        'total_steps':70000
    }

    return config

def main(data_dir, save_path, batch_size, n_workers, valid_steps, warmup_steps, total_steps, save_steps):

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f'[Info]: Use {device} now!')

    train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers)
    train_iterator = iter(train_loader)

    model = Classifier(n_spks=speaker_num).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = AdamW(model.parameters(), lr=1e-3)
    scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps)

    best_accuracy = 0.0
    best_state_dict = None

    pbar = tqdm(total=valid_steps, ncols=0, desc='Train', unit=' step')

    for step in range(total_steps):
        # Get data
        try:
            batch = next(train_iterator)
        except StopIteration:
            train_iterator = iter(train_loader)
            batch = next(train_iterator)

        loss, accuracy = model_fn(batch, model, criterion, device)
        batch_loss = loss.item()
        batch_accuracy = accuracy.item()

        # Updata model
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

        # Log
        pbar.update()
        pbar.set_postfix(
            loss=f"{batch_loss:.2f}",
            accuracy=f"{batch_accuracy:.2f}",
            step=step + 1,
        )

        # Do validation
        if (step + 1) % valid_steps == 0:
            pbar.close()

            valid_accuracy = valid(valid_loader, model, criterion, device)

            # keep the best model
            if valid_accuracy > best_accuracy:
                best_accuracy = valid_accuracy
                best_state_dict = model.state_dict()

            pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

        # Save the best model so far.
        if (step + 1) % save_steps == 0 and best_state_dict is not None:
            torch.save(best_state_dict, save_path)
            pbar.write(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")

    pbar.close()


# if __name__ == "__main__":
main(**parse_args())

In [None]:
import os
import json
import torch
from pathlib import Path
from torch.utils.data import Dataset

class InferenceDataset(Dataset):
    def __init__(self, data_dir):
        testdata_path = Path(data_dir) / 'testdata.json'
        metadata = json.load(testdata_path.open())
        self.data_dir = data_dir
        self.data = metadata['utterances']

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        utterance = self.data[index]
        feat_path = utterance['feature_path']
        mel = torch.load(os.path.join(self.data_dir, feat_path))

        return feat_path, mel

def inference_collate_batch(batch):
    feat_paths, mels = zip(*batch)

    return feat_paths, torch.stack(mels)


In [None]:
import json
import csv
from pathlib import Path
from tqdm.notebook import tqdm

import torch
from torch.utils.data import DataLoader

def parse_args():

    config = {
        'data_dir': '/kaggle/input/ml2023springhw4/Dataset',
        'model_path': '/kaggle/input/training-model/model.ckpt',
        'output_path': '/kaggle/working/submission.csv',
    }

    return config

def main(data_dir, model_path, output_path):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    dataset = InferenceDataset(data_dir)
    dataloader = DataLoader(dataset,
                           batch_size=1,
                           shuffle=False,
                           drop_last=False,
                           num_workers=8,
                           collate_fn=inference_collate_batch)

    mapping_path = Path(data_dir) / 'mapping.json'
    mapping = json.load(mapping_path.open())
    speaker_num = len(mapping['id2speaker'])

    model = Classifier().to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()

    results = [['Id', 'Category']]
    for feat_path, mel in tqdm(dataloader):
        with torch.no_grad():
            mel = mel.to(device)
            out = model(mel)
            pred = out.argmax(1).cpu().numpy()

            results.append([feat_path[0], mapping['id2speaker'][str(pred[0])]])

    with open(output_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerows(results)

# if __name__ == '__main__':
main(**parse_args())

In [None]:
!pip install -q kaggle

# 建立 kaggle 目錄與 API 金鑰
!mkdir -p ~/.kaggle
!cp "/content/drive/MyDrive/Machine Learning/kaggle.json" ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [None]:
# 下載 Kaggle 比賽資料（原始 zip）
!kaggle competitions download -c ml2023springhw4 -p /content/kaggle

# 解壓縮 zip（注意 Dataset 會是子資料夾）
!unzip -q /content/kaggle/ml2023springhw4.zip -d /content/kaggle


In [None]:
import os
print(os.listdir("/content/kaggle"))
print(os.listdir("/content/kaggle/Dataset"))  # ← 你要用的 data_dir


In [None]:
def parse_args():
    config = {
        'data_dir': '/content/kaggle/Dataset',
        'save_path': '/content/best_model.ckpt',
        'output_path': '/content/submission.csv',
        'batch_size': 32,
        ...
    }
    return config
