## CNN2D + Transformer (1 input)

In this notebook, we will try implement a CNN combined with a transformer model to make an ordinal classification. Only answer and reading tasks will be used as an input.

In [1]:
import IPython.display as ipd
import librosa
import librosa.display
import pandas as pd
import os, time, warnings
import numpy as np

In [2]:
def resize_spectrogram(spec, length, fact=-80):

    # Create an empty canvas to put spectrogram into
    canvas = np.ones((len(spec), length)) * fact

    if spec.shape[1] <= length:
        canvas[:, : spec.shape[1]] = spec
    else:
        canvas[:, :length] = spec[:, :length]
    return canvas

def compute_mel_spec(filename, sr=8000, hop_length=512, duration=30.0):

    # Loads the mp3 file
    y, sr = librosa.load(filename, sr=sr)

    # Compute the mel spectrogram
    x_mel = librosa.feature.melspectrogram(y=y, sr=sr)

    # Apply logarithmic dB-scale to spectrogram and set maximum to 0 dB
    x_mel = librosa.power_to_db(x_mel, ref=np.max)

    # Compute mean strength per frequency for mel spectrogram
    mel_strength = np.mean(x_mel, axis=1)

    # Estimate the desired length of the spectrogram
    length = int(duration * sr / hop_length)

    # print(np.min(x_mel))
    # print(np.max(x_mel))

    # Put mel spectrogram into the right shape
    x_mel = resize_spectrogram(x_mel, length)

    x_mel = librosa.util.normalize(x_mel, axis=1)
    x_mel = np.ones(x_mel.shape) + x_mel


    return x_mel, mel_strength

In [3]:
from matplotlib import pyplot as plt

In [4]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#device="cpu"

In [5]:
print(device)

cuda


In [6]:
from torch.utils.data.dataset import Dataset

In [7]:
import torch.nn.functional as F

In [8]:
class MyCustomDataset(Dataset):
    def __init__(self, audio_file_list, scores):
        self.audio_file_list = audio_file_list
        self.scores = scores
        
    def __getitem__(self, index):
        img = self.audio_file_list[index]
        # img, _ = librosa.load(img, sr=16000)
        # print(type(img))
        # img = torch.tensor(img, dtype=torch.float32)
        # img = torch.mean(img, dim=0).unsqueeze(0)
        # img = torchaudio.transforms.Spectrogram()(img)
        img, _ = compute_mel_spec(f'/home/ongun/challenge/answer/{img}')
        img = torch.tensor(img, dtype=torch.float32)
        #print(img.shape)
        # img = torch.mean(img, dim=0).unsqueeze(0)
        # img = F.pad(img, (0, 1000 - img.shape[1]))
        img = img.reshape([1, img.shape[0], img.shape[1]])
        # img = generate_features(img)
        # torch.index_select(img, 1, torch.LongTensor([2,0,1]))
        #print(img.shape)
        
        def ordinal_labeler(score):
            levels = [1]*score + [0]*(39 - score)
            levels = torch.tensor(levels, dtype=torch.float32)
            return levels
        
        score = self.scores[index]
        label = np.array(ordinal_labeler(score))
        return img, label

    def __len__(self):
        count = len(self.audio_file_list)
        return count

In [9]:
df = pd.read_csv("modified_data.csv")
#vowel_df=df.iloc[::2]
df=df.iloc[1::2]

In [10]:
print(df.columns)

Index(['userid', 'filename', 'date', 'score', 'excercise_type'], dtype='object')


In [13]:
audio_list = np.array(df['filename'].tolist())
score_list = np.array(df['score'].tolist())

In [14]:
indices = np.arange(len(audio_list))
np.random.shuffle(indices)

limit = int(len(indices)*0.7)

X_train, X_test = audio_list[indices[:limit]], audio_list[indices[limit:]]
y_train, y_test = score_list[indices[:limit]], score_list[indices[limit:]] 

In [15]:
train_data = MyCustomDataset(X_train, y_train)
test_data = MyCustomDataset(X_test, y_test)

In [16]:

num_classes = 40
batch_size = 1
learning_rate = 0.001

In [17]:
from torch.utils.data import DataLoader

In [18]:
train_loader = DataLoader(dataset=train_data, batch_size=batch_size)
test_loader = DataLoader(dataset=test_data, batch_size=batch_size)

In [19]:
def prediction2label(pred: np.ndarray, threshold: float):
    """Convert ordinal predictions to class labels, e.g.
    
    [0.9, 0.1, 0.1, 0.1] -> 0
    [0.9, 0.9, 0.1, 0.1] -> 1
    [0.9, 0.9, 0.9, 0.1] -> 2
    etc.
    """
    return (pred > threshold).cumprod(axis=1).sum(axis=1) - 1

In [20]:
importance_weights = torch.ones(39, dtype=torch.float).to(device)

def loss_fn2(logits, levels, imp=importance_weights):
    val = (-torch.sum((F.logsigmoid(logits)*levels
                      + (F.logsigmoid(logits) - logits)*(1-levels))*imp,
           dim=1))
    return torch.mean(val)

def loss_fn(logits, levels):
    logits = prediction2label(logits)
    modified_target = torch.zeros_like(logits)

    # Fill in ordinal target function, i.e. 0 -> [1,0,0,...]
    for i, target in enumerate(levels):
        modified_target[i, 0:target+1] = 1

    return nn.MSELoss(reduction='none')(logits, modified_target).sum(axis=1)

### Model Architecture

In [21]:
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.transformer_maxpool1 = nn.MaxPool2d(kernel_size=[1,4], stride=[1,4])
        
        # define single transformer encoder layer
        # self-attention + feedforward network from "Attention is All You Need" paper
        # 4 multi-head self-attention layers each with 40-->512--->40 feedforward network
        transformer_layer1 = nn.TransformerEncoderLayer(
            d_model=128, # input feature (frequency) dim after maxpooling 40*282 -> 40*70 (MFC*time)
            nhead=4, # 4 self-attention layers in each multi-head self-attention layer in each encoder block
            dim_feedforward=512, # 2 linear layers in each encoder block's feedforward network: dim 40-->512--->40
            dropout=0.4, 
            activation='relu' # ReLU: avoid saturation/tame gradient/reduce compute time
        )        
        self.transformer_encoder1 = nn.TransformerEncoder(transformer_layer1, num_layers=4)

        
        
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=7, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.drop_out = nn.Dropout()
        
        #self.fc1 = nn.Linear(111360+128, 512)
        self.fc1= nn.Linear(55808,512)
        #self.fc2 = nn.Linear(4096, 512)
        self.fc3 = nn.Linear(512, 39)
        # self.last = nn.Softmax(dim=1)
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = out.reshape([out.size(0), -1])
        #print(out.shape)
        
        
        
        x_maxpool1 = self.transformer_maxpool1(x)

        # remove channel dim: 1*40*70 --> 40*70
        x_maxpool_reduced1 = torch.squeeze(x_maxpool1,1)
        
        # convert maxpooled feature map format: batch * freq * time ---> time * batch * freq format
        # because transformer encoder layer requires tensor in format: time * batch * embedding (freq)
        x_1 = x_maxpool_reduced1.permute(2,0,1) 
        
        # finally, pass reduced input feature map x into transformer encoder layers
        transformer_output_1 = self.transformer_encoder1(x_1)
        
        # create final feature emedding from transformer layer bytaking mean in the time dimension (now the 0th dim)
        # transformer outputs 2x40 (MFCC embedding*time) feature map, take mean of columns i.e. take time average
        transformer_embedding_1 = torch.mean(transformer_output_1, dim=0) # dim 40x70 --> 40
        out = torch.cat([out,transformer_embedding_1], dim=1)  

         
        
        
        out = self.drop_out(out)
        out = self.fc1(out)
        #out = self.fc2(out)
        out = self.fc3(out)
        # out = self.last(out)
        return out

model = ConvNet()

# Loss and optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [22]:
model.to(device)

ConvNet(
  (transformer_maxpool1): MaxPool2d(kernel_size=[1, 4], stride=[1, 4], padding=0, dilation=1, ceil_mode=False)
  (transformer_encoder1): TransformerEncoder(
    (layers): ModuleList(
      (0): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): _LinearWithBias(in_features=128, out_features=128, bias=True)
        )
        (linear1): Linear(in_features=128, out_features=512, bias=True)
        (dropout): Dropout(p=0.4, inplace=False)
        (linear2): Linear(in_features=512, out_features=128, bias=True)
        (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.4, inplace=False)
        (dropout2): Dropout(p=0.4, inplace=False)
      )
      (1): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): _LinearWithBias(in_features=128, out_features=128, bias=True)
        )
        (linear1): Line

In [23]:
import warnings
warnings.filterwarnings('ignore')

### Training

In [24]:
num_epochs = 150
total_step = len(train_loader)

for epoch in range(num_epochs):
    loss_list = []
    acc_list = []
    for i, (images, labels) in enumerate(train_loader):
        # Run the forward pass
        images = images.cuda()
        labels = labels.cuda()
    
        outputs = model(images)
        loss = loss_fn2(outputs, labels)
        loss_list.append(loss.item())

        # Backprop and perform Adam optimisation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Track the accuracy
        total = labels.size(0)
        _, predicted = torch.max(outputs.data, 1)
        # correct = (predicted == labels).sum().item()
        # acc_list.append(correct / total)
        if np.sum(loss_list)<0.01:
            break
    print(f'epoch: {epoch}: acc:','loss: ',np.sum(loss_list))

epoch: 0: acc: loss:  2106.3761077923264
epoch: 1: acc: loss:  1347.7166709899902
epoch: 2: acc: loss:  1344.7397148609161
epoch: 3: acc: loss:  1390.698515176773
epoch: 4: acc: loss:  1327.0460159778595
epoch: 5: acc: loss:  1321.1217851638794
epoch: 6: acc: loss:  1321.04714345932
epoch: 7: acc: loss:  1287.9782330989838
epoch: 8: acc: loss:  1373.1890199184418
epoch: 9: acc: loss:  1202.7783708572388
epoch: 10: acc: loss:  1196.7528923749924
epoch: 11: acc: loss:  1008.4439240694046
epoch: 12: acc: loss:  999.0041728019714
epoch: 13: acc: loss:  887.0032672509551
epoch: 14: acc: loss:  822.0994097329676
epoch: 15: acc: loss:  643.3915438628756
epoch: 16: acc: loss:  554.4169617407024
epoch: 17: acc: loss:  539.7846945705824
epoch: 18: acc: loss:  434.65060008158616
epoch: 19: acc: loss:  407.3654048551907
epoch: 20: acc: loss:  348.33986478898237
epoch: 21: acc: loss:  188.86282402577467
epoch: 22: acc: loss:  193.36830148733725
epoch: 23: acc: loss:  194.7101021816443
epoch: 24: ac

In [25]:
torch.save(model.state_dict(), "1inputtransformer-overfit.pt")

### Evaluation

In [38]:
def check_accuracy(test_loader: DataLoader, model: nn.Module):
    num_correct = 0
    total = 0
    model.eval()
    device="cuda"
    acc_list=[]
    with torch.no_grad():
        for data, labels in test_loader:
            data = data.to(device=device)
            labels = labels.to(device=device)
           
            predictions = model(data)
            
            total = labels.size(0)
            predicted = prediction2label(predictions, 0.5)
            targets = prediction2label(labels, 0.5)
            num_correct = (predicted == targets).sum().item()
            acc_list.append(num_correct/total)
        print(f"Test Accuracy of the model: {np.mean(acc_list)}")


In [39]:
check_accuracy(test_loader,model)

Test Accuracy of the model: 0.0625


In [40]:
model.eval()
for i, (images, labels) in enumerate(train_loader):
    images = images.cuda()
    labels = labels.cuda()

    outputs = model(images)
    print(prediction2label(labels, 0.9))
    print(prediction2label(outputs, 0.9))


tensor([8], device='cuda:0')
tensor([8], device='cuda:0')
tensor([7], device='cuda:0')
tensor([7], device='cuda:0')
tensor([8], device='cuda:0')
tensor([8], device='cuda:0')
tensor([8], device='cuda:0')
tensor([8], device='cuda:0')
tensor([11], device='cuda:0')
tensor([10], device='cuda:0')
tensor([22], device='cuda:0')
tensor([22], device='cuda:0')
tensor([15], device='cuda:0')
tensor([15], device='cuda:0')
tensor([8], device='cuda:0')
tensor([8], device='cuda:0')
tensor([10], device='cuda:0')
tensor([10], device='cuda:0')
tensor([4], device='cuda:0')
tensor([4], device='cuda:0')
tensor([2], device='cuda:0')
tensor([2], device='cuda:0')
tensor([5], device='cuda:0')
tensor([5], device='cuda:0')
tensor([0], device='cuda:0')
tensor([0], device='cuda:0')
tensor([0], device='cuda:0')
tensor([0], device='cuda:0')
tensor([12], device='cuda:0')
tensor([12], device='cuda:0')
tensor([22], device='cuda:0')
tensor([22], device='cuda:0')
tensor([1], device='cuda:0')
tensor([1], device='cuda:0')
te

Getting 0 loss and almost 0 accuracy implies that our model overfitted which was to be expected.