In [1]:
!python -V

Python 3.8.18


In [2]:
!pip install matplotlib pandas tqdm librosa pydub einops numpy ipywidgets
!pip install torch==2.0.1
!pip install torchvision==0.15.2
!pip install torchaudio==2.0.2



In [3]:
import os
import pandas as pd
import numpy as np
from pydub import AudioSegment
import random
import librosa
import librosa.display
from PIL import Image
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torchvision.transforms as transforms
from torchaudio.transforms import SpeedPerturbation, AddNoise, PitchShift, FrequencyMasking, TimeMasking,TimeStretch

from einops import rearrange, repeat
from einops.layers.torch import Rearrange





In [4]:
config = {
    "batch_size": 32,
    "epochs": 75,
    "lr": 1e-4,
    "sr": 16000,
    "fix_len": 4,
    "n_fft": 1024,
    "window_len": 512,
    "hop_len": 64,
    "n_mels": 128
}

if torch.cuda.is_available():
    device = torch.device("cuda:0")
else:
    device = torch.device("cpu")

print(device)

cpu


In [None]:
def seed_everything(seed):
  torch.random.manual_seed(seed)
  random.seed(seed)
  os.environ['PYTHONHASHSEED'] = str(seed)

  if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
  np.random.seed(seed)
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

seed_everything(42)

In [5]:
def shift_time(signal, shift_max, shift_direction='both'):

    # shifting 단위 결정
    signal = signal.reshape(1,-1)

    shift = torch.randint(-shift_max, shift_max, (1,))

    # shifting 방향

    if shift_direction == 'forward':
        shift = -shift
    elif shift_direction == 'backward':
        pass
    elif shift_direction == 'both':
        pass
    else:
        raise ValueError("shift_direction should be 'both', 'forward' or 'backward'")

    # shifting

    augmented_signal = torch.roll(signal, shifts=int(shift), dims=-1)
    if shift > 0:
        augmented_signal[:, :shift] = 0
    else:
        augmented_signal[:, shift:] = 0

    return augmented_signal.reshape(-1,)


augmentation = [
                'SpecAugment'
               ]

In [12]:
test_df = pd.read_csv('./CremaD/test.csv')
train_df = pd.read_csv('./CremaD/train.csv')

In [13]:
audio_train_path = './train'
audio_test_path = './test'

train_df, valid_df = train_test_split(train_df,test_size=0.25,random_state=42)

In [14]:
class CustomImageDataset(Dataset):
    def __init__(self, csv_file, root_dir, cfg, transform=None, add_coordinate=False, augmentation=[]):
        self.annotations = csv_file
        self.root_dir = root_dir
        self.cfg = config
        self.transform = transform
        self.add_coordinate = add_coordinate
        self.augmentation = augmentation
        self.TM = TimeMasking(time_mask_param=192)
        self.FM = FrequencyMasking(freq_mask_param=48)

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        wav_name = os.path.join(self.root_dir, self.annotations.iloc[idx, 0].split('_')[0].lower(), self.annotations.iloc[idx, 0] +'.wav')
        y, sr = librosa.load(wav_name, sr = self.cfg['sr'])
        label = int(self.annotations.iloc[idx, 2])

        if self.transform:
            #spectrogram 변환 전 필요한 augment method
            #fixed_y 
            if 'SpeedPerturbaton' in self.augmentation:
                SP = SpeedPerturbation(16000, [0.9, 1.0, 1.1])
                y = SP(torch.tensor(y))[0]

            fixed_y = librosa.util.fix_length(np.array(y), size=self.cfg['sr']*self.cfg['fix_len'])
            fixed_y = torch.tensor(fixed_y)

            if 'AddNoise' in self.augmentation:
                noise = torch.normal(0,1,size=(self.cfg['sr']*self.cfg['fix_len'],))
                AN = AddNoise()
                fixed_y = AN(fixed_y, noise, torch.tensor(10))


            if 'TimeShifting' in self.augmentation:
                fixed_y = shift_time(fixed_y,8000)


            if 'PitchShifting' in self.augmentation:
                PS=PitchShift(self.cfg['sr'], 4)
                fixed_y = PS(fixed_y).detach()
            ##### mel-spectrogram 변환
            
            mel_spec = librosa.feature.melspectrogram(y=np.array(fixed_y), 
                                                      n_mels=self.cfg['n_mels'], 
                                                      n_fft=self.cfg['n_fft'], 
                                                      hop_length=self.cfg['hop_len'], 
                                                      win_length=self.cfg['window_len'])
            #mel_spec에 resize, totensor같은 기본 transform 적용
            mel_spec = librosa.power_to_db(mel_spec, ref=np.max)

            # [0, 1] 범위로 정규화
            min_ = np.min(mel_spec, axis=(0, 1), keepdims=True)
            max_ = np.max(mel_spec, axis=(0, 1), keepdims=True)
            mel_spec = (mel_spec - min_) / (max_ - min_)
            

            if 'SpecAugment' in self.augmentation:
                mel_spec = self.TM(torch.tensor(mel_spec[np.newaxis, :]))
                mel_spec = self.FM(mel_spec)

                mel_spec = np.array(mel_spec.reshape(self.cfg['n_mels'],-1))
            
            mel_spec = self.transform(mel_spec)

            if self.add_coordinate: #https://github.com/walsvid/CoordConv/blob/master/coordconv.py
                channel_in_shape, dim_y, dim_x = mel_spec.shape
                xx_ones = torch.ones([1, 1, dim_x], dtype=torch.int32)
                yy_ones = torch.ones([1, 1, dim_y], dtype=torch.int32)

                xx_range = torch.arange(dim_y, dtype=torch.int32)
                yy_range = torch.arange(dim_x, dtype=torch.int32)
                xx_range = xx_range[None, :, None]
                yy_range = yy_range[None, :, None]

                xx_channel = torch.matmul(xx_range, xx_ones)
                yy_channel = torch.matmul(yy_range, yy_ones)

                # transpose y
                yy_channel = yy_channel.permute(0, 2, 1)

                xx_channel = xx_channel.float() / (dim_y - 1)
                yy_channel = yy_channel.float() / (dim_x - 1)

                xx_channel = xx_channel * 2 - 1
                yy_channel = yy_channel * 2 - 1

                xx_channel = xx_channel.to(device)
                yy_channel = yy_channel.to(device)
                mel_spec = mel_spec.to(device)

                #print(image.shape)
                #print(xx_channel.shape)
                #print(yy_channel.shape)
                mel_spec = torch.cat([mel_spec, xx_channel, yy_channel], dim=0)
        return mel_spec, label


transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((128, 128))
    #transforms.Normalize((0.5,), (0.5,))
])
root_dir = './CremaD'
# Create dataset
train_dataset = CustomImageDataset(csv_file=train_df, root_dir=root_dir, cfg=config, transform=transform, add_coordinate = True, augmentation=augmentation)
val_dataset = CustomImageDataset(csv_file=valid_df, root_dir=root_dir, cfg=config, transform=transform, add_coordinate = True)

# Create DataLoader
train_dataloader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=True)


In [15]:
# https://github.com/lucidrains/vit-pytorch/blob/main/vit_pytorch/vit.py

def pair(t):
    return t if isinstance(t, tuple) else (t, t)

# classes

class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout = 0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

class Attention(nn.Module):
    def __init__(self, dim, heads = 8, dim_head = 64, dropout = 0.):
        super().__init__()
        inner_dim = dim_head *  heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.norm = nn.LayerNorm(dim)

        self.attend = nn.Softmax(dim = -1)
        self.dropout = nn.Dropout(dropout)

        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)

        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        ) if project_out else nn.Identity()

    def forward(self, x):
        x = self.norm(x)

        qkv = self.to_qkv(x).chunk(3, dim = -1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h = self.heads), qkv)

        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale

        attn = self.attend(dots)
        attn = self.dropout(attn)

        out = torch.matmul(attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        return self.to_out(out)

class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout = 0.):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                Attention(dim, heads = heads, dim_head = dim_head, dropout = dropout),
                FeedForward(dim, mlp_dim, dropout = dropout)
            ]))

    def forward(self, x):
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x

        return self.norm(x)

class ViT(nn.Module):
    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, pool = 'cls', channels = 3, dim_head = 64, dropout = 0., emb_dropout = 0.):
        super().__init__()
        image_height, image_width = pair(image_size)
        patch_height, patch_width = pair(patch_size)

        assert image_height % patch_height == 0 and image_width % patch_width == 0, 'Image dimensions must be divisible by the patch size.'

        num_patches = (image_height // patch_height) * (image_width // patch_width)
        patch_dim = channels * patch_height * patch_width
        assert pool in {'cls', 'mean'}, 'pool type must be either cls (cls token) or mean (mean pooling)'

        self.to_patch_embedding = nn.Sequential(
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1 = patch_height, p2 = patch_width),
            nn.LayerNorm(patch_dim),
            nn.Linear(patch_dim, dim),
            nn.LayerNorm(dim),
        )

        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.dropout = nn.Dropout(emb_dropout)

        self.transformer = Transformer(dim, depth, heads, dim_head, mlp_dim, dropout)

        self.pool = pool
        self.to_latent = nn.Identity()

        self.mlp_head = nn.Linear(dim, num_classes)

    def forward(self, img):
        x = self.to_patch_embedding(img)
        b, n, _ = x.shape

        cls_tokens = repeat(self.cls_token, '1 1 d -> b 1 d', b = b)
        x = torch.cat((cls_tokens, x), dim=1)
        x += self.pos_embedding[:, :(n + 1)]
        x = self.dropout(x)

        x = self.transformer(x)

        x = x.mean(dim = 1) if self.pool == 'mean' else x[:, 0]

        x = self.to_latent(x)
        return self.mlp_head(x)

Specifically, the embedding dimension of ViT used in this paper is 256, the depth of the Transformer encoder is 6 layers, and the number of parallel operations is 5 heads. Fig. 5. shows the speech emotion recognition process with concatenate coordinate information of ViT used in this paper.

In [16]:
def validation(model,valid_loader,criterion):
    model.eval()
    val_loss = []

    total, correct = 0,0
    test_loss = 0

    with torch.no_grad():
        for x,y in tqdm(iter(valid_loader)):
            x = x.to(device)
            y = y.flatten().to(device)

            output = model(x)
            loss = criterion(output,y)
            val_loss.append(loss.item())
            test_loss += loss.item()
            _,predicted = torch.max(output,1)

            total += y.size(0)
            correct += predicted.eq(y).cpu().sum()
    acc = correct/total
    avg_loss = np.mean(val_loss)

    return avg_loss,acc

In [17]:
def train(model, train_loader, valid_loader, optimizer, scheduler):
    model.train()
    criterion = nn.CrossEntropyLoss().to(device)
    best_model = None
    best_acc = 0
    for ep in range(1,config['epochs']+1):
        train_loss = []

        for i,(x,y) in enumerate(tqdm(train_loader)):
            x = x.to(device)
            y = y.flatten().to(device)

            optimizer.zero_grad()
            output = model(x)

            loss = criterion(output,y)

            loss.backward()

            optimizer.step()
            train_loss.append(loss.item())

        avg_loss = np.mean(train_loss)
        valid_loss, valid_acc = validation(model,valid_loader,criterion)

        if scheduler is not None:
            scheduler.step()

        if valid_acc > best_acc:
            best_acc = valid_acc
            best_model = model
        print(f'epoch:[{ep}] train loss:[{avg_loss:.5f}] valid_loss:[{valid_loss:.5f}] valid_acc:[{valid_acc:.5f}]')

    print(f'best_acc:{best_acc:.5f}')

    return best_model

In [None]:
model = ViT(
    image_size=128,
    patch_size=8,
    num_classes=6,
    dim=256,#128
    depth=3,
    heads=6,
    mlp_dim=256,#128
    pool='cls',
    channels=3,
    dim_head=256,
    dropout=0.1,
    emb_dropout=0.16
).to(device)
optimizer = torch.optim.Adam(model.parameters(),lr=config['lr'])
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)

infer_model = train(model,train_dataloader,val_dataloader,optimizer,scheduler)