<a href="https://colab.research.google.com/github/zambbo/CNN-DialectDetector/blob/master/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [48]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [147]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
import re
from matplotlib import pyplot as plt
from glob import glob
import numpy as np
import pickle
from tqdm import tqdm
import time

In [50]:
index2region={0:'gangwon', 1:'gyeongsang', 2:'jeonla', 3:'chungcheong', 4:'jeju'}
region2index = {v:k for k,v in index2region.items()}

In [51]:
# 데이터 셋 구성 (small dataset)
dataset_dir = '/content/drive/MyDrive/DialectDataset/small_dataset/'

In [52]:
region_dir = glob(dataset_dir)
region_dir

['/content/drive/MyDrive/DialectDataset/small_dataset/']

In [53]:
for k, v in index2region.items():
    exec(f"{v}_dirs = glob(dataset_dir+'*_{v}/*')")
jeonla_dirs

['/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000014',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000012',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000019',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000006',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000015',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000032',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000027',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000018',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000005',
 '/content/drive/MyDrive/DialectDataset/small_dataset/preprocessed_jeonla/DJDD20000024']

In [54]:
def make_tuple_data(dirs, max_num):
    for i, region_dir in enumerate(dirs):
        if i>=max_num:break
        spectro_path = glob(region_dir+'/*_spectro.pickle')[0]
        mfcc_path = glob(region_dir+'/*_mfcc.pickle')[0]
        chroma_path = glob(region_dir+'/*_chroma.pickle')[0]
        
        with open(spectro_path, "rb") as f:
            spectro = pickle.load(f)
        with open(mfcc_path, "rb") as f:
            mfcc = pickle.load(f)
        with open(chroma_path, "rb") as f:
            chroma = pickle.load(f)

        if i == 0:
            spectro_data = spectro
            mfcc_data = mfcc
            chroma_data = chroma
        else:
            spectro_data = np.concatenate([spectro_data,spectro], axis=0)
            mfcc_data = np.concatenate([mfcc_data,mfcc], axis=0)
            chroma_data = np.concatenate([chroma_data,chroma], axis=0)
        
    r_data = [(s,m,c) for s,m,c in zip(spectro_data,mfcc_data,chroma_data)]
        
    return r_data
jeonla_data = make_tuple_data(jeonla_dirs, 2)
chungcheong_data = make_tuple_data(chungcheong_dirs, 2)
gyeongsang_data = make_tuple_data(gyeongsang_dirs, 2)
jeju_data = make_tuple_data(jeju_dirs, 2)
gangwon_data = make_tuple_data(gangwon_dirs, 2)

print("data num: ", len(jeonla_data))
print("tuple size", len(jeonla_data[0]))
print("spec shape", jeonla_data[0][0].shape)

data num:  197
tuple size 3
spec shape (201, 501)


In [55]:
jeonla_data = [(d, region2index['jeonla']) for d in jeonla_data]
chungcheong_data = [(d, region2index['chungcheong']) for d in chungcheong_data]
gangwon_data = [(d, region2index['gangwon']) for d in gangwon_data]
jeju_data = [(d, region2index['jeju']) for d in jeju_data]
gyeongsang_data = [(d, region2index['gyeongsang']) for d in gyeongsang_data]

In [56]:
datasumup = np.concatenate([jeonla_data, chungcheong_data, gangwon_data, jeju_data, gyeongsang_data], axis=0)



In [57]:
print(len(jeonla_data))
print(len(jeonla_data[0]))
print(len(jeonla_data[0][0]))
print(jeonla_data[0][0][0].shape)
print(jeonla_data[0][1])

197
2
3
(201, 501)
2


In [58]:
class MultiModalDataset(Dataset):

    def __init__(self, data):

        self.data = data

    def __getitem__(self, idx):
        datas, label = self.data[idx]
        spec, mfcc, chroma = datas

        spec, mfcc, chroma = torch.tensor(spec, dtype=torch.float32), torch.tensor(mfcc, dtype=torch.float32), torch.tensor(chroma, dtype=torch.float32)
        spec, mfcc, chroma = spec.unsqueeze(0), mfcc.unsqueeze(0), chroma.unsqueeze(0)
        label = torch.tensor(label, dtype=torch.long)
        data = (spec, mfcc, chroma)
        return data, label
    
    def __len__(self):
        return len(self.data)

In [59]:
dataset = MultiModalDataset(datasumup)
len(dataset)

920

In [60]:
device = torch.device('cuda:0')

In [135]:
class BasicBlock(nn.Module):

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()

        self.relu = nn.ReLU()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3,3), stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.seq1 = nn.Sequential(self.conv1, self.bn1, self.relu)
        self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=(3,3), stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.seq2 = nn.Sequential(self.conv2, self.bn2)
        
        self.down_flag = False
        if in_channels != out_channels: self.down_flag = True

        self.downsample = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(1,1), stride=2, padding=0, bias=False)
    
    def forward(self, x):
        #print(x.shape)
        y = self.seq1(x)
        #print(y.shape)
        y = self.seq2(y)
        #print(y.shape)

        if self.down_flag:
            x = self.downsample(x)
        
        y = self.relu(y)
        #print(x.shape)
        #print(y.shape)
        y += x

        return y
        

In [86]:
block = BasicBlock(1, 64, stride=2).to(device)

In [87]:
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

In [88]:
torch.cuda.empty_cache()

In [89]:
for i, (data, label) in enumerate(dataloader):

    if i==1:break

    data = data[0]#spec
    data = data.to(device)
    out = block(data)
    print(out.shape)


torch.Size([16, 1, 201, 501])
torch.Size([16, 64, 101, 251])
torch.Size([16, 64, 101, 251])
torch.Size([16, 64, 101, 251])
torch.Size([16, 64, 101, 251])
torch.Size([16, 64, 101, 251])


In [92]:
out[0][0]

tensor([[-7.2468, -6.0161, -5.6971,  ..., -5.7139, -3.4103, -7.1196],
        [-8.8140, -9.7790, -9.7790,  ..., -9.7790, -8.2538, -8.5428],
        [-9.2004, -9.7790, -9.6664,  ..., -9.7790, -6.4552, -9.7790],
        ...,
        [-9.2318, -9.7487, -9.7790,  ..., -9.7790, -5.9848, -9.7790],
        [-9.7790, -9.7790, -9.7790,  ..., -9.7790, -9.7444, -9.7790],
        [-9.7790, -9.7790, -9.7790,  ..., -9.7790, -9.3316, -9.7790]],
       device='cuda:0', grad_fn=<SelectBackward0>)

In [140]:
class ResNet18(nn.Module):

    def __init__(self, in_channels, output_dim=1024, model_type='spec'):
        super(ResNet18, self).__init__()

        self.relu = nn.ReLU()

        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=64, kernel_size=(7,7), stride=2, padding=3)
        self.BN1 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d(kernel_size=(3,3), stride=2, padding=1)

        self.seq1 = nn.Sequential(self.conv1, self.BN1, self.pool1)

        self.seq2 = nn.Sequential(BasicBlock(64,64), BasicBlock(64,64))
        self.seq3 = nn.Sequential(BasicBlock(64,64), BasicBlock(64, 128, stride=2))
        self.seq4 = nn.Sequential(BasicBlock(128,128), BasicBlock(128,128))
        self.seq5 = nn.Sequential(BasicBlock(128,128), BasicBlock(128,256,stride=2))

        if model_type=='spec':
            self.fc1 = nn.Linear(256*13*32, output_dim)
        elif model_type=='mfcc':
            self.fc1 = nn.Linear(256*7*32, output_dim)
        elif model_type=='chroma':
            self.fc1 = nn.Linear(256*1*32, output_dim)


        self.lastlayer = nn.Sequential(self.fc1, self.relu)

    def forward(self, x):
        y = self.seq1(x)
        y = self.seq2(y)
        y = self.seq3(y)
        y = self.seq4(y)
        y = self.seq5(y)
        y = y.view(y.shape[0],-1)
        y = self.lastlayer(y)

        return y



In [141]:
class MultiModalDialectClassifier(nn.Module):

    def __init__(self, hidden_dim=1024, out_dim=5, learning_rate=0.01):
        super(MultiModalDialectClassifier, self).__init__()

        self.spec_res = ResNet18(1, model_type='spec')
        self.mfcc_res = ResNet18(1, model_type='mfcc')
        self.chroma_res = ResNet18(1, model_type='chroma')

        self.relu = nn.ReLU()
        self.fc1 = nn.Linear(1024*3, 512)
        self.dropout = nn.Dropout(0.2)
        self.fc2 = nn.Linear(512,out_dim)
        self.lastlayer = nn.Sequential(self.fc1, self.relu, self.dropout, self.fc2)

        self.loss_f = nn.CrossEntropyLoss()
        self.optim = optim.AdamW(self.parameters(), lr=learning_rate)
    
    def forward(self, x):
        spec_x, mfcc_x, chroma_x = x

        spec_y = self.spec_res(spec_x)
        mfcc_y = self.mfcc_res(mfcc_x)
        chroma_y = self.chroma_res(chroma_x)

        y = torch.cat([spec_y, mfcc_y, chroma_y], dim=1)
        y = y.view(y.shape[0], -1)
        
        y = self.lastlayer(y)
        print(y.shape)
        return y
    
    def train(self, train_loader, valid_loader, learning_rate, epochs, device):
        self.train_accuracy = []
        self.valid_accuracy = []
        best_epoch = -1
        best_acc = -1

        self.train()
        
        for epoch in range(1, epochs+1):
            total = 0
            correct = 0
            start_time = time.time()
            epoch_loss = 0.0
            epoch_acc = 0.0

            for batch_idx, (batch_data, batch_label) in enumerate(tqdm(train_loader)):
                
                spec, mfcc, chroma = batch_data
                spec, mfcc, chroma = spec.to(device), mfcc.to(device), chroma.to(device)
                batch_data = (spec, mfcc, chroma)

                self.optimizer.zero_grad()

                pred = self.forward(batch_data)
                loss = self.loss(pred, batch_label)
                loss.backward()
                self.optimizer.step()

                epoch_loss += loss

                _, pred_indices = torch.max(pred, axis=1)
                total += batch_data.shape[0]
                correct += pred_indices.eq(batch_label).sum().item()
            
            epoch_loss /= len(train_loader)
            epoch_acc = correct / total

            if epoch_acc > best_acc:
                best_acc = epoch_acc
                best_epoch = epoch
            
    def predict(self, test_loader, device):
        self.eval()
        labels = []
        predicted = []

        with torch.no_grad():
            for batch_idx, (batch_data, batch_label) in enumerate(tqdm(test_loader)):

                spec, mfcc, chroma = batch_data
                spec, mfcc, chroma = spec.to(device), mfcc.to(device), chroma.to(device)
                batch_data = (spec, mfcc, chroma)
                
                pred = self.forward(batch_data)

                _, pred_indices = torch.max(pred, axis=1)

                predicted.append(pred_indices.numpy())
                labels.append(batch_label.numpy())

        predicted = np.concatenate(predicted, axis=0)       
        labels = np.concatenate(labels, axis=0)

        return predicted, labels
        
        

In [142]:
model = MultiModalDialectClassifier().to(device)

In [143]:
for i, (data, label) in enumerate(dataloader):

    if i==1:break

    data = data
    spec, mfcc, chroma = data
    spec, mfcc, chroma = spec.to(device), mfcc.to(device), chroma.to(device)
    data = (spec, mfcc, chroma)
    out = model(data)
    print(out.shape)
    _, predict = torch.max(out, axis=1)
    pred.eq
    print(predict)
    print(label)


torch.Size([16, 5])
torch.Size([16, 5])
tensor([1, 1, 1, 1, 1, 1, 3, 3, 1, 1, 1, 1, 1, 1, 1, 1], device='cuda:0')
tensor([2, 4, 3, 3, 2, 0, 3, 4, 3, 3, 1, 0, 0, 4, 0, 1])


In [168]:
a = torch.tensor([1,2])
b = torch.tensor([1,2])

In [175]:
a.numpy()

array([1, 2])

In [174]:
np.array([]).shape

(0,)