In [1]:
from tqdm import tqdm
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import librosa
from sklearn.model_selection import train_test_split

from transformers import AutoModelForAudioClassification, AutoFeatureExtractor

from src.const import AUDIO_PATH, MAIN_LABELS, BATCH_SIZE, VALIDATION_SPLIT, SEED
from src.preprocess import load_and_preprocess, transform_to_data_loader, get_dl_for_pretrained
from src.preprocess_utils import normalize_ds

from torchinfo import summary


# summary(bin_bilstm_model, (1, 63, 128), device="cuda")

In [2]:
def train_model(
        model, 
        criterion, 
        optimizer, 
        train_loader, 
        val_loader, 
        model_type, 
        epoch_count,
        using_pretrained=False,
        early_stopping=False,
    ):
    
    train_losses = []
    val_losses = []
    best_val_loss = float('inf')
    patience = 5
    no_improve_count = 0

    # Training
    for epoch in range(epoch_count):
        model.train()
        train_loss = 0
        correct_train = 0
        total_train = 0

        for batch_X, batch_y in tqdm(train_loader, f"Epoch {epoch+1}"):
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            model.zero_grad()
            outputs = model(batch_X)

            if using_pretrained:
                outputs = outputs.logits

            if model_type == "bin":
                loss = criterion(outputs, batch_y.unsqueeze(1))
            else:
                loss = criterion(outputs, batch_y.long())

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            # Loss
            train_loss += loss.item() * batch_X.size(0)

            # Accuracy
            if model_type == "bin":
                predicted = (outputs > 0.5).float()
                correct_train += (predicted == batch_y.unsqueeze(1)).sum().item()
            else:
                predicted = torch.argmax(outputs, dim=1)
                correct_train += (predicted == batch_y).sum().item()
            
            
            total_train += batch_y.size(0)

        train_loss /= len(train_loader.dataset)
        train_acc = correct_train / total_train
        train_losses.append(train_loss)
        
        model.eval()
        val_loss = 0
        correct_val = 0
        total_val = 0

        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                outputs = model(batch_X)

                if using_pretrained:
                    outputs = outputs.logits

                if model_type == "bin":
                    loss = criterion(outputs, batch_y.unsqueeze(1))
                else:
                    loss = criterion(outputs, batch_y.long())
                
                # Loss
                val_loss += loss.item() * batch_X.size(0)

                # Accuracy
                if model_type == "bin":
                    predicted = (outputs > 0.5).float()
                    correct_val += (predicted == batch_y.unsqueeze(1)).sum().item()
                else:
                    predicted = torch.argmax(outputs, dim=1)
                    correct_val += (predicted == batch_y).sum().item()
                total_val += batch_y.size(0)

            val_loss /= len(val_loader.dataset)
            val_acc = correct_val / total_val
            val_losses.append(val_loss)
            
        print(f'Epoch {epoch+1}/{epoch_count}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}')
        
        
        if not early_stopping:
            continue
        
        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            no_improve_count = 0
        else:
            no_improve_count += 1
        
        if no_improve_count >= patience:
            print('Early stopping')
            break
            
    return train_losses, val_losses

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [4]:
X, y = np.load("augmented_data.npy"), np.load("augmented_labels.npy")

In [5]:
# transform = MFCC(sample_rate=16000, n_mfcc=13, melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 23, "center": False})
# X_tensor = torch.from_numpy(X)

# indices = [i for i in range(0, X_tensor.shape[0], 5000)]
# indices.append(X_tensor.shape[0])

# X_transformed = transform(X_tensor[indices[0]:indices[1]])
# for i in range(1, len(indices)-1):
#     X_transformed = np.concatenate((X_transformed, transform(X_tensor[indices[i]:indices[i+1]]).numpy()), 0)



# X_transformed_main = X_transformed[y != 10]
# y_main = y[y != 10]

# X_train, X_test, y_train, y_test = train_test_split(
#     X_transformed_main, y_main, test_size=0.25, random_state=42
# )

# del X_transformed_main, y_main



# X_transformed_bin = X_transformed
# y_bin = create_binary_labels(y)

# X_train, X_test, y_train, y_test = train_test_split(
#     X_transformed_bin, y_bin, test_size=0.25, random_state=222
# )

# del X_transformed_bin, y_bin

In [6]:
X_main = X[y != 10]
y_main = y[y != 10]

X_train, X_test, y_train, y_test = train_test_split(
    X_main, y_main, test_size=0.25, random_state=42
)

X_train = np.expand_dims(X_train, 1)
X_test = np.expand_dims(X_test, 1)

In [7]:
train_dl = transform_to_data_loader(X_train, y_train, device=device)
val_dl = transform_to_data_loader(X_test, y_test, device=device)

del X_train, X_test, y_train, y_test

In [16]:
class main_Transformer(nn.Module):
    
    def __init__(self, d_model, n_head, num_layers, num_class):
        super().__init__()

        self.trans_enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, 
            nhead=n_head,
            dim_feedforward=256,
            batch_first=True, 
            activation="relu", 
            dropout=0.3
        )
        self.transformer_encoder = nn.TransformerEncoder(self.trans_enc_layer, num_layers=num_layers)
        
        self.dropout = nn.Dropout(0.1)
        self.fc1 = nn.Linear(d_model, num_class)
        self.relu = nn.ReLU()

    def forward(self, x):
        out = self.transformer_encoder(x)
        out = torch.mean(out, dim=0)
        out = self.fc1(out)
        return out

In [20]:
d_model = 16000
n_head = 4
num_class = 10
num_layers = 2

model = main_Transformer(d_model, n_head, num_layers, num_class).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

In [18]:
for a, b in train_dl:
    break

In [19]:
model(a)

AssertionError: was expecting embedding dimension of 128, but got 16000