In [None]:
import os
import random
import pandas as pd
import torch
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
import numpy as np

In [None]:
def load_feat(path):
    feat = torch.load(path)
    return feat

def shift(x, n):
    if n < 0:
        left = x[0].repeat(-n, 1)
        right = x[:n]

    elif n > 0:
        right = x[-1].repeat(n, 1)
        left = x[n:]
    else:
        return x

    return torch.cat((left, right), dim=0)


def concat_feat(x, concat_n):
    assert concat_n % 2 == 1 # n must be odd
    if concat_n < 2:
        return x
    seq_len, feature_dim = x.size(0), x.size(1)
    x = x.repeat(1, concat_n) 
    x = x.view(seq_len, concat_n, feature_dim).permute(1, 0, 2) # concat_n, seq_len, feature_dim
    mid = (concat_n // 2)
    for r_idx in range(1, mid+1):
        x[mid + r_idx, :] = shift(x[mid + r_idx], r_idx)
        x[mid - r_idx, :] = shift(x[mid - r_idx], -r_idx)

    return x.permute(1, 0, 2).view(seq_len, concat_n * feature_dim)


def preprocess_data(split, feat_dir, phone_path, concat_nframes, train_ratio=0.8, train_val_seed=1337):
    class_num = 41 # NOTE: pre-computed, should not need change
    mode = 'train' if (split == 'train' or split == 'val') else 'test'

    label_dict = {}
    if mode != 'test':
        phone_file = open(os.path.join(phone_path, f'{mode}_labels.txt')).readlines()
        for line in phone_file:
            line = line.strip('\n').split(' ')
            label_dict[line[0]] = [int(p) for p in line[1:]]

    if split == 'train' or split == 'val':
        # split training and validation data
        usage_list = open(os.path.join(phone_path, 'train_split.txt')).readlines()
        random.seed(train_val_seed)
        random.shuffle(usage_list)
        percent = int(len(usage_list) * train_ratio)
        usage_list = usage_list[:percent] if split == 'train' else usage_list[percent:]
    elif split == 'test':
        usage_list = open(os.path.join(phone_path, 'test_split.txt')).readlines()
    else:
        raise ValueError('Invalid \'split\' argument for dataset: PhoneDataset!')

    usage_list = [line.strip('\n') for line in usage_list]
    print('[Dataset] - # phone classes: ' + str(class_num) + ', number of utterances for ' + split + ': ' + str(len(usage_list)))

    X = []
    if mode != 'test':
        y = []
    
    for i, fname in tqdm(enumerate(usage_list)):
        feat = load_feat(os.path.join(feat_dir, mode, f'{fname}.pt'))
        feat = concat_feat(feat, concat_nframes)
        X.append(feat)
        
        if mode != 'test':
            label = torch.LongTensor(label_dict[fname])

        if mode != 'test':
            y.append(label)

    print(f'[INFO] {split} set')
    print(len(X))
    if mode != 'test':
        print(len(y))
        return X, y
    else:
        return X

## Dataloader

In [None]:
X, y = preprocess_data('train', feat_dir='/kaggle/input/ml2022spring-hw2/libriphone/libriphone/feat', phone_path='/kaggle/input/ml2022spring-hw2/libriphone/libriphone', concat_nframes=1, train_ratio=0.8, train_val_seed=1337)

from torch.nn.utils.rnn import pad_sequence
train_x = pad_sequence(X)  # (seq_len, data_len, feat_dim)
train_y = pad_sequence(y)  # (seq_len, data_len)
seq_len = train_x.size(0)
print(train_x.shape, train_y.shape)

del train_x, train_y

In [None]:
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

class LibriDataset(Dataset):
    def __init__(self, X, y=None):
        self.data = pad_sequence(X)  # (seq_len, batch_size, feat_dim)
        
        if y is not None:
            y = pad_sequence(y)
            self.label = y.long()
        else:
            self.label = None

    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[:, idx, :], self.label[:, idx]
        else:
            return self.data[:, idx, :]

    def __len__(self):
        return self.data.size(1)

## model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class LSTMClassifier(nn.Module):
    def __init__(self, input_dim=39, mid_dim=256, mid_layers=2, out_dim=41, bidirectional=True):
        super(LSTMClassifier, self).__init__()
        
        self.rnn1 = nn.LSTM(input_dim, mid_dim, mid_layers, bidirectional=bidirectional)  # rnn
        if not bidirectional:
            self.reg = nn.Sequential(
                nn.Linear(mid_dim, mid_dim // 2),
                nn.LayerNorm(mid_dim // 2),
                nn.Dropout(p=0.2),
                nn.LeakyReLU(inplace=True, negative_slope=0.01),
                nn.Linear(mid_dim // 2, out_dim),
            )  # regression
        else:
            print("use bidirectional lstm")
            self.reg = nn.Sequential(
                nn.Linear(mid_dim * 2, mid_dim),
                nn.LayerNorm(mid_dim),
                nn.Dropout(p=0.2),
                nn.LeakyReLU(inplace=True, negative_slope=0.01),
                nn.Linear(mid_dim, out_dim),
            )  # regression

    def forward(self, x):
        out, (h, c) = self.rnn1(x)  # (seq_len, batch_size, mid_dim), [(num_layers, batch_size, mid_dim), (num_layers, batch_size, mid_dim)]
#         out, (h, c) = self.rnn2(out, )
        x = self.reg(out)  # (seq_len, batch_size, out_dim)
        return x

## hyper-parameters

In [None]:
# data prarameters
concat_nframes = 5              # the number of frames to concat with, n must be odd (total 2k+1 = n frames)

# model parameters
input_dim = 39 * concat_nframes # the input dim of the model, you should not change the value
mid_dim = 256                   # the hidden dim
mid_layers = 3
bidirectional = True

In [None]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print(f'DEVICE: {device}')

## testing

In [None]:
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

class LibriTestDataset(Dataset):
    def __init__(self, X):
        
        self.data = X

    def __getitem__(self, idx):
        return self.data[idx]

    def __len__(self):
        return len(self.data)

In [None]:
# load data
feat_dir='/kaggle/input/ml2022spring-hw2/libriphone/libriphone/feat'
phone_path='/kaggle/input/ml2022spring-hw2/libriphone/libriphone'
test_X = preprocess_data(split='test', feat_dir=feat_dir, phone_path=phone_path, concat_nframes=concat_nframes)
test_set = LibriTestDataset(test_X)
test_loader = DataLoader(test_set, batch_size=1, shuffle=False)

In [None]:
# load model
model = LSTMClassifier(input_dim=input_dim, mid_dim=mid_dim, mid_layers=mid_layers, out_dim=41).to(device)


## single model

In [None]:
test_acc = 0.0
test_lengths = 0
pred = np.array([], dtype=np.int32)

model.load_state_dict(torch.load("../input/model-seed42/model.ckpt", map_location='cpu'))
model.eval()
with torch.no_grad():
    for i, batch in enumerate(tqdm(test_loader)):
        features = batch
        features = features.permute(1, 0, 2).to(device)  # (seq_len, batch_size, feat_dim)
        outputs = model(features)  # (seq_len, batch_size, cls_num)
        _, test_pred = torch.max(outputs.permute(1, 0, 2), -1) # get the index of the class with the highest probability / (batch_size, seq_len)
        pred = np.concatenate((pred, test_pred.squeeze().cpu().numpy()), axis=0)
    

with open('prediction_model_seed42.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(pred):
        f.write('{},{}\n'.format(i, y))

In [None]:
test_acc = 0.0
test_lengths = 0
pred = np.array([], dtype=np.int32)

model.load_state_dict(torch.load("../input/model-seed7/model.ckpt", map_location='cpu'))
model.eval()
with torch.no_grad():
    for i, batch in enumerate(tqdm(test_loader)):
        features = batch
        features = features.permute(1, 0, 2).to(device)  # (seq_len, batch_size, feat_dim)
        outputs = model(features)  # (seq_len, batch_size, cls_num)
        _, test_pred = torch.max(outputs.permute(1, 0, 2), -1) # get the index of the class with the highest probability / (batch_size, seq_len)
        pred = np.concatenate((pred, test_pred.squeeze().cpu().numpy()), axis=0)
        
with open('prediction_model_seed7.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(pred):
        f.write('{},{}\n'.format(i, y))

## ensemble

In [None]:
def ensemble(model, pretrained_models):
    probs = []
    preds = []
    for model_path in pretrained_models:
        model.load_state_dict(torch.load(model_path, map_location='cpu'))
        print(f'load model: {model_path}')
        model.eval()
        tmp_prob = []
        tmp_pred = np.array([], dtype=np.int32)
        with torch.no_grad():
            for i, batch in enumerate(tqdm(test_loader)):
                features = batch
                features = features.permute(1, 0, 2).to(device)  # (seq_len, batch_size=1, feat_dim)
                outputs = model(features)  # (seq_len, batch_size=1, cls_num)
                tmp_prob.append(outputs.permute(1, 0, 2).cpu().numpy()[0])  # (batch_size=1, seq_len, cls_num)
                
                _, test_pred = torch.max(outputs.permute(1, 0, 2), -1) # get the index of the class with the highest probability / (batch_size=1, seq_len)
                tmp_pred = np.concatenate((tmp_pred, test_pred.squeeze().cpu().numpy()), axis=0)
            probs.append(np.concatenate(tmp_prob, axis=0))  # (tot_seq_len, cls_num)
            preds.append(tmp_pred)  # (tot_seq_len, )
    return probs, preds


In [None]:
model_pathes = ['../input/model-seed42/model.ckpt', '../input/model-seed7/model.ckpt'   
probs, preds = ensemble(model, model_pathes)

### average logist

In [None]:
prob_out = np.zeros_like(probs[0])
for prob in probs:
    prob_out += (1 / len(probs)) * prob
print(prob_out.shape)

pred_by_ensemble_probs = np.argmax(prob_out, axis=1).astype(np.int32)
print(pred_by_ensemble_probs.shape)

with open('prediction_ensemble_probs.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(pred_by_ensemble_probs):
        f.write('{},{}\n'.format(i, y))

### votting

In [None]:
pred_out = np.zeros_like(preds[0])
for i, prob in enumerate(zip(*preds)):
    if np.random.rand() < 0.5:
        pred_out[i] = prob[0]
    else:
        pred_out[i] = prob[1]
    
print(pred_out.shape)

pred_by_ensemble_probs = np.max(prob_out, axis=1)
print(pred_by_ensemble_probs.shape)

with open('prediction_ensemble_preds.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(pred_out):
        f.write('{},{}\n'.format(i, y))