In [5]:
# from tensorflow.keras.utils import to_categorical
import torch
from torch.utils.data import Dataset
import torchaudio
from scipy.signal import butter, filtfilt
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
import torch.nn as nn
import os
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
import tensorflow as tf
import torch.optim as optim
import torch.nn.functional as F
from sklearn.metrics import accuracy_score
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch
import torch.nn as nn
from collections import deque
import random


class GTZANDataset(Dataset):
    # Your __init__ method remains the same
    def __init__(self, directory, exclude_file=None, max_len=675808, mfcc_extraction=False, frame_length=128, frame_step=1024, num_mel_bins=128, n_mfcc=10, low_pass_cutoff=2000, filter_order=5, fft_length=1024, downsample=False):
        self.directory = directory
        self.classes = os.listdir(directory)
        self.class_indices = {cls_name: i for i, cls_name in enumerate(self.classes)}
        self.samples = []
        self.targets = []
        
        # parameters
        self.max_len = max_len
        self.downsample = downsample
        self.mfcc_extraction = mfcc_extraction
        self.low_pass_cutoff = low_pass_cutoff
        self.filter_order = filter_order
        self.frame_length = frame_length
        self.frame_step = frame_step
        self.num_mel_bins = num_mel_bins
        self.n_mfcc = n_mfcc
        self.fft_length = fft_length
        

        # create list of files and list of targets
        for cls_idx, cls_name in enumerate(self.classes):
            cls_path = os.path.join(directory, cls_name)
            filenames = os.listdir(cls_path)
            for filename in filenames:
                if filename != exclude_file:
                    sample_path = os.path.join(cls_path, filename)
                    self.samples.append(sample_path)
                    self.targets.append(cls_idx)
        self.targets = self.one_hot_encode(self.targets, num_classes=len(self.classes))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        sample_path = self.samples[index]
        target = self.targets[index]
        processed_waveform, target = self._load_and_process_waveform(sample_path, target)
        return processed_waveform, target

    def one_hot_encode(self, x, num_classes):
        return torch.eye(num_classes)[x]

    def _load_and_process_waveform(self, sample_path, target):#
        chunk_targets = []
        waveform, sample_rate = torchaudio.load(sample_path)
        desired_length = self.max_len

        waveform = waveform.squeeze(0)
        
        # adjust waveform to desired length
        if len(waveform) < desired_length * 30:
            num_zeros = desired_length*30 - len(waveform)
            pad_left = num_zeros // 2
            pad_right = num_zeros - pad_left
            waveform = np.pad(waveform.numpy(), (pad_left, pad_right), mode='constant')            
        elif len(waveform) > desired_length * 30:
            start = (len(waveform) - desired_length * 30) // 2
            end = start + desired_length * 30
            waveform = waveform[start:end]

            
        # Split waveform into 1-second chunks
        chunk_size = sample_rate * 1
        padding_size = chunk_size - (len(waveform) % chunk_size)
        waveform = np.pad(waveform, (0, padding_size), mode='constant')
        waveform = torch.from_numpy(waveform)
        num_chunks = len(waveform) // chunk_size
        waveform_chunks = torch.split(waveform[:chunk_size*num_chunks], chunk_size)
        if waveform.shape[0] % chunk_size != 0:
            num_zeros = chunk_size - (waveform.shape[0] % chunk_size)
            waveform = F.pad(waveform, (0, num_zeros), 'constant', 0)
        processed_chunks = []
        
        for chunk in waveform_chunks:            
            if self.downsample:
                # downsample
                def downsample_waveform(waveform, sample_rate):
                    b, a = butter(self.filter_order, self.low_pass_cutoff / (sample_rate / 2), 'lowpass')
                    waveform_np = waveform
                    filtered_waveform_np = filtfilt(b, a, waveform_np.squeeze())
                    return torch.from_numpy(np.float32(filtered_waveform_np).reshape(1, -1))
                chunk = downsample_waveform(chunk, sample_rate)
                chunk = chunk.squeeze(0)
            
            
            if self.mfcc_extraction:
                # mfcc extraction
                mfcc_transform = torchaudio.transforms.MFCC(sample_rate=sample_rate, n_mfcc=self.n_mfcc, melkwargs={'n_fft': self.fft_length, 'n_mels': self.num_mel_bins, 'hop_length': self.frame_step, 'win_length': self.frame_length})

                mfccs = mfcc_transform(chunk)
                processed_chunks.append(mfccs)
                chunk_targets.append(target)
            else:
                processed_chunks.append(chunk)
                chunk_targets.append(target)
                
        return processed_chunks, chunk_targets
 

In [6]:
class LSTMNet(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5):
        super(LSTMNet, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.input_size = input_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.bn = nn.BatchNorm1d(hidden_size)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 32),
            nn.BatchNorm1d(32),
            nn.Dropout(dropout),
            nn.Linear(32, num_classes),           
        )
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        batch_size = np.shape(x)[0]
        
        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]
        out = self.bn(out)
        out = self.dropout(out)
        out = self.fc(out)
        return out

    

# Parameters
hidden_size = 32
num_layers = 2
num_epochs = 100
n_mfcc = 35
max_len=22050
mfcc_extraction=True
frame_length = 256
frame_step= 200
num_mel_bins=128
low_pass_cutoff=2000
filter_order=5
downsample=True
fft_length = 1024
batch_size = 32

num_classes = 10  
input_size = n_mfcc  

directory = "/kaggle/input/gtzan-dataset-music-genre-classification/Data/genres_original"
exclude_file = "jazz.00054.wav"
dataset = GTZANDataset(directory, exclude_file=exclude_file, max_len=max_len, mfcc_extraction=mfcc_extraction, frame_length=frame_length, frame_step=frame_step, num_mel_bins=num_mel_bins, n_mfcc=n_mfcc, low_pass_cutoff=low_pass_cutoff, filter_order=filter_order, fft_length=fft_length, downsample=downsample)

# normalize dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((-1, -1, -1), (2, 2, 2)) 
])


dataset.transform = transform

# Instantiate the model
model = LSTMNet(input_size, hidden_size, num_layers, num_classes)

# Define the ratios for train, validation, and test
train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1

# Calculate the lengths of each split
train_len = int(train_ratio * len(dataset))
val_len = int(val_ratio * len(dataset))
test_len = len(dataset) - train_len - val_len



# Use random_split to split the dataset
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_len, val_len, test_len])

# define dataloaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [212]:
def train(model, train_loader, criterion, optimizer, device):
    model.to(device)
    train_loss, train_total, train_correct = 0.0, 0, 0
    model.train()
    counter = 0
    for inputchunks, labelchunks in train_loader:

        for x in range(len(inputchunks)):
            inputs, labels = inputchunks[x].to(device), labelchunks[x].to(device)
            optimizer.zero_grad()
                            
            inputs = inputs.permute(0,2,1)
            outputs = model(inputs)
            predicted = outputs.data.argmax(dim=1)
            
            # train model
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            # calculate loss and acc
            train_loss += loss.item()
            counter += (inputs.size(0))
            train_total += labels.size(0)
            _, labels = labels.max(dim=1)
            train_correct += (predicted == labels).sum().item()
            
    train_loss /= (len(train_loader.dataset) *  len(inputchunks))
    train_acc = train_correct / (train_total)

          
    return train_loss, train_acc

In [213]:
#for validation and test datasets
def validate(model, dataloader, criterion, device):
    
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for inputchunks, labelchunks in dataloader:
            for x in range(len(inputchunks)):
                
                inputs = inputchunks[x].to(device)
                labels = labelchunks[x].to(device)
                #if downsample:
                #    inputs = np.squeeze(inputs, axis=1)
                
                inputs = inputs.permute(0,2,1)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, labels = labels.max(dim=1)
                
                # calculate loss and acc
                val_loss += loss.item()
                predicted = outputs.data.argmax(dim=1)
                val_correct += (predicted == labels).sum().item()
                val_total += labels.size(0)
                
    val_loss /= (len(dataloader.dataset) * len(inputchunks))
    val_acc = val_correct / (val_total)
    
    return val_loss, val_acc

In [214]:


# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
learning_rate = 0.0001
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4) # Set weight decay here
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.3, patience=5, verbose=True)


best_val_loss = 1

# Define variables to store training and validation losses and accuracies
train_losses = []
train_accs = []
val_losses = []
val_accs = []
patience = 5
patience_counter = 0
for epoch in range(num_epochs):
    # Train the model
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)

    # Validate the model
    val_loss, val_acc = validate(model, val_loader, criterion, device)

    # Save the model if the validation loss has decreased
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'lstm_net.pth')
        patience_counter = 0
    else:
        patience_counter += 1

    # Early stopping
    if patience_counter >= patience:
        print('Validation stopped improving for ', patience, 'epochs. Stopping early.')
        
    scheduler.step(val_loss)
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    # Print the training and validation loss and accuracy for this epoch
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

test_loss, test_acc = validate(model, test_loader, criterion, device)

print("Test Acc: ", test_acc, "Test loss: ", test_loss)

Epoch 1/100, Train Loss: 2.2432, Train Acc: 0.1797, Val Loss: 2.1315, Val Acc: 0.2866
Epoch 2/100, Train Loss: 2.0429, Train Acc: 0.2496, Val Loss: 2.0860, Val Acc: 0.3101
Epoch 3/100, Train Loss: 1.9316, Train Acc: 0.2910, Val Loss: 2.0175, Val Acc: 0.3255
Epoch 4/100, Train Loss: 1.8786, Train Acc: 0.3044, Val Loss: 2.0428, Val Acc: 0.3291
Epoch 5/100, Train Loss: 1.8249, Train Acc: 0.3322, Val Loss: 1.9441, Val Acc: 0.3458
Epoch 6/100, Train Loss: 1.7858, Train Acc: 0.3454, Val Loss: 1.9542, Val Acc: 0.3633
Epoch 7/100, Train Loss: 1.7285, Train Acc: 0.3689, Val Loss: 1.7895, Val Acc: 0.3620
Epoch 8/100, Train Loss: 1.6899, Train Acc: 0.3846, Val Loss: 1.9916, Val Acc: 0.3561
Epoch 9/100, Train Loss: 1.6746, Train Acc: 0.3942, Val Loss: 1.9972, Val Acc: 0.3809
Epoch 10/100, Train Loss: 1.6353, Train Acc: 0.4070, Val Loss: 1.8649, Val Acc: 0.3646
Epoch 11/100, Train Loss: 1.5994, Train Acc: 0.4210, Val Loss: 1.8670, Val Acc: 0.3965
Validation loss has not improved for the past 5 epoc

KeyboardInterrupt: 

In [5]:
def phase_shuffle(x, shuffle_factor = 2):

    # Calculate required padding for shuffling
    pad_left = shuffle_factor // 2
    pad_right = shuffle_factor - pad_left

    # Randomly select a value from the range [-pad_left, pad_right] for each sample in the batch
    shuffle_offsets = torch.randint(-pad_left, pad_right + 1, (x.size(0), 1, 1)).to(x.device)

    # Pad the input tensor before shuffling
    x_padded = F.pad(x, (pad_left, pad_right), mode='reflect')

    # Create a list to store the shuffled tensors
    shuffled_tensors = []

    # Loop through each sample in the batch
    for i, shuffle_offset in enumerate(shuffle_offsets.squeeze()):
        
        # Slice the tensor according to the shuffle offset
        start = pad_left + shuffle_offset.item()
        end = x_padded.size(2) - pad_right + shuffle_offset.item()
        shuffled_tensors.append(x_padded[i, :, start:end])

    # Stack the shuffled tensors back into a single tensor
    x_shuffled = torch.stack(shuffled_tensors, dim=0)

    return x_shuffled

#MFCC WaveGAN generator implementation
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        
        # Fully connected layer
        self.fc = nn.Sequential(
            nn.Linear(110,1400),
            nn.BatchNorm1d(1400),
            nn.LeakyReLU(),
            
        )
        
        # convolutional layers
        self.transconv_layers = nn.Sequential(
            nn.ConvTranspose1d(140, 70, kernel_size=25, stride=4, padding=11, output_padding=0),
            nn.BatchNorm1d(70),
            nn.LeakyReLU(),
            nn.Dropout(0.3),
            nn.ConvTranspose1d(70, 35, kernel_size=25, stride=4, padding=11, output_padding=0),
            nn.BatchNorm1d(35),
            nn.LeakyReLU(),
            nn.Dropout(0.3),

            nn.ConvTranspose1d(35, 35, kernel_size=25, stride=4, padding=11, output_padding=0),
            nn.BatchNorm1d(35),
            nn.LeakyReLU(),
            nn.Dropout(0.3),

            nn.ConvTranspose1d(35, 35, kernel_size=25, stride=4, padding=11, output_padding=0),
            nn.Tanh()
        )

    def forward(self, mfcc, genre_label):
        x = torch.cat((mfcc.view(mfcc.size(0), -1), genre_label), dim=1)
        x = self.fc(x)
        
        x = x.view(x.size(0), 140, 10)
        
        x = self.transconv_layers(x)
        x = x[:,:,:111]
        return x

#MFCC WaveGAN descriminator implementation
class Discriminator(nn.Module):
    def __init__(self, shuffle_range=2):
        super(Discriminator, self).__init__()
        
        self.conv1 = nn.Conv1d(35, 16, kernel_size=25, stride=4, padding=11)
        self.lr =  nn.LeakyReLU(0.2)
        self.conv2 =   nn.Conv1d(16, 32, kernel_size=25, stride=4, padding=11)
        self.conv3 =    nn.Conv1d(32, 64, kernel_size=25, stride=4, padding=11)
        self.conv4 =    nn.Conv1d(64, 128, kernel_size=20, stride=4, padding=11)
        
        self.fc = nn.Sequential(
            nn.Linear(266, 1),
            nn.Sigmoid()
        )
        
        self.bn1 = nn.BatchNorm1d(16)
        self.bn2 = nn.BatchNorm1d(32)
        self.bn3 = nn.BatchNorm1d(64)
        self.bn4 = nn.BatchNorm1d(128)
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, x, genre_label):        
        x = self.conv1(x)
        self.lr(x)
        phase_shuffle(x)
        self.dropout = nn.Dropout(0.3)
        x = self.bn1(x)

        x = self.conv2(x)
        x = self.bn2(x)

        self.lr(x)

        phase_shuffle(x)
        self.dropout = nn.Dropout(0.5)

        x = self.conv3(x)

        self.lr(x)
        phase_shuffle(x)
        self.dropout = nn.Dropout(0.5)
        x = self.bn3(x)

        x = self.conv4(x)

        self.lr(x)
        phase_shuffle(x)
        self.dropout = nn.Dropout(0.5)
        x = self.bn4(x)
  
        x = x.view(x.size(0), -1)
                
        x = torch.cat((x, genre_label), dim=1)
        x = self.fc(x)
        return x


In [6]:
# Load pre-trained LSTM model
timesteps = 111
num_classes = 10
pre_trained_model = LSTMNet(input_size, hidden_size, num_layers, num_classes)
#pre_trained_model.load_state_dict(torch.load('/kaggle/input/musicgenrelstm/lstm_net.pth'))
pre_trained_model.load_state_dict(torch.load('/kaggle/working/lstm_net.pth'))

for param in pre_trained_model.parameters():
    param.requires_grad = False

# Instantiate new discriminator with transfer learning
z_dim = 100
n_classes = 10
D = Discriminator()
G = Generator()



FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/working/lstm_net.pth'

In [22]:
from collections import deque
import random

def train_gan(G, D, train_dataloader,val_dataloader, num_epochs, z_dim, n_classes, n_mfcc, device):
    # Define loss function
    criterion = nn.BCELoss()
    counter = 0
    othercounter = 0
    # Define optimizers
    optimizer_G = torch.optim.Adam(G.parameters(), lr=0.0007, weight_decay = 0.0001, betas=(0.3, 0.999))
    optimizer_D = torch.optim.Adam(D.parameters(), lr=0.000,weight_decay = 0.0001, betas=(0.3, 0.999))
    # Initialize Replay Buffer
    replay_buffer = ReplayBuffer(128)
    
    # Move models to device

    # Set pre-trained model to evaluation mode
    G.to(device)
    D.to(device)

    for epoch in range(num_epochs):
        g_correct = 0
        g_total = 0
        g_val_correct = 0
        g_val_total = 0
        d_total = 0
        d_correct = 0
        d_val_correct = 0
        d_val_total = 0
        G.train()
        D.train()
        train_G_loss = 0
        train_D_loss = 0
        for inputchunks, labelchunks in train_dataloader:
            othercounter += 1
            for x in range(len(inputchunks)):

                # Move data to device
                optimizer_D.zero_grad()
                mfccs =  inputchunks[x].to(device)              
                labels = labelchunks[x].to(device)
                batch_size = mfccs.size(0)
                
                # Real data
                real_labels = torch.ones(batch_size, 1).uniform_(0.7, 1.0).to(device)  # Assuming device is set up
                real_data = mfccs
                real_output = D(real_data, labels)
                real_loss = criterion(real_output, real_labels)
                real_loss.backward()
                optimizer_D.step()
                optimizer_D.zero_grad()
                
                # Fake data
                noise = torch.randn(batch_size, z_dim).to(device)  # Assuming z_dim is the noise dimension
                fake_data = G(noise, labels)
                fake_labels = torch.zeros(batch_size, 1).uniform_(0.0, 0.3).to(device)
                fake_output = D(fake_data.detach(), labels)
                fake_loss = criterion(fake_output, fake_labels)
               
                
                # Update discriminator
                fake_loss.backward()
                optimizer_D.step()
                d_loss = (real_loss.item() + fake_loss.item())/2
                train_D_loss += d_loss

                # calculate accuracy
                d_total += real_labels.size(0)
                d_correct += (torch.round(real_output) == torch.round(real_labels)).sum().item()
                d_total += labels.size(0)
                d_correct += (torch.round(fake_output) == torch.round(fake_labels)).sum().item()
                
                # Train the generator
                optimizer_G.zero_grad()

                # Flip the labels for the generator
                flipped_labels = torch.ones(batch_size, 1).uniform_(0.7, 1.0).to(device)
                output1 = D(fake_data, labels)
                g_loss1 = criterion(output1, flipped_labels)
                g_correct += (torch.round(output1) == torch.round(flipped_labels)).sum().item()
                g_total += flipped_labels.size(0)
                
                # Update generator
                g_loss1.backward()     
                optimizer_G.step()
                optimizer_G.zero_grad()
                
                # Fake data
                noise = torch.randn(batch_size, z_dim).to(device)  # Assuming z_dim is the noise dimension
                fake_data = G(noise, labels)
                
                # Flip the labels for the generator
                flipped_labels = torch.ones(batch_size, 1).uniform_(0.7, 1.0).to(device)
                output2 = D(fake_data, labels)
                g_loss2 = criterion(output2, flipped_labels)
                g_correct += (torch.round(output2) == torch.round(flipped_labels)).sum().item()
                g_total += flipped_labels.size(0)
                train_G_loss += ((g_loss1.item() + g_loss2.item())/2)

                # Update generator
                g_loss2.backward()     
                optimizer_G.step()
                
                counter += 1
                                
        g_grad_norm = 0.0        
        d_grad_norm = 0.0
        
        # calculate grad norms to ensure functional training
        for p in D.parameters():
            if p.grad is not None:
                d_grad_norm += p.grad.norm(2).item()  # Use L2 norm
        d_grad_norm = np.sqrt(d_grad_norm)
        print("Discriminator gradient norm:", d_grad_norm)
        for p in G.parameters():
            if p.grad is not None:
                g_grad_norm += p.grad.norm(2).item()  # Use L2 norm
        g_grad_norm = np.sqrt(g_grad_norm)
        print("Generator gradient norm:", g_grad_norm)
    
        train_G_loss /= len(train_dataset) 
        train_D_loss /= len(train_dataset) 
        G.eval()
        D.eval()
        val_loss = 0
        val_acc = 0
        num_batches = 0
        g_val_loss = 0
        d_val_loss = 0
        if epoch % 2 == 0:
            
            # Validation
            with torch.no_grad():
                for inputchunks, labelchunks in val_dataloader:
                    for x in range(len(inputchunks)):

                        inputs =  inputchunks[x].to(device)
                        labels = labelchunks[x].to(device)
                        
                        batch_size = inputs.size(0)

                        # Real data
                        real_labels = torch.ones(batch_size, 1).uniform_(0.7, 1.0).to(device)
                        real_data = inputs
                        real_output = D(real_data, labels)
                        real_loss = criterion(real_output, real_labels)

                        # Fake data
                        noise = torch.randn(batch_size, z_dim).to(device)
                        fake_data = G(noise, labels)
                        fake_labels = torch.zeros(batch_size, 1).uniform_(0.0, 0.3).to(device)
                        fake_output = D(fake_data.detach(), labels)
                        fake_loss = criterion(fake_output, fake_labels)

                        # discriminator accuracy and loss
                        d_val_total += real_labels.size(0)
                        d_val_correct += (torch.round(real_output) == torch.round(real_labels)).sum().item()
                        d_val_total += labels.size(0)
                        d_val_correct += (torch.round(fake_output) == torch.round(fake_labels)).sum().item()
                        d_loss = (real_loss + fake_loss)/2

                        # Generator loss
                        flipped_labels = torch.ones(batch_size, 1).uniform_(0.7, 1.0).to(device)
                        output = D(fake_data, labels)
                        g_loss = criterion(output, flipped_labels)
                        
                        # generator accuracy
                        g_val_correct += (torch.round(output) == torch.round(flipped_labels)).sum().item()
                        g_val_total += flipped_labels.size(0)
                        g_val_loss += g_loss.item()
                        d_val_loss += d_loss
                        num_batches += 1

            g_val_loss /= len(val_dataset)
            d_val_loss /= len(val_dataset)
            g_acc = ( g_correct / g_total)
            g_val_acc = g_val_correct / g_val_total
            d_acc = d_correct / d_total         
            d_val_acc = d_val_correct / d_val_total
            
            print("[Epoch %d/%d] [G loss %f] [D loss %f] [G V loss: %f] [D Val loss: %f]"
                      % (epoch, num_epochs,train_G_loss, train_D_loss, g_val_loss, d_val_loss))
            print("[Epoch %d/%d] [G Acc %f] [D Acc %f] [G V Acc: %f] [D Val Acc: %f]"
                      % (epoch, num_epochs,g_acc, d_acc, g_val_acc, d_val_acc))
            
    return generator, discriminator


In [8]:
lstm = LSTMNet(input_size, hidden_size, num_layers, num_classes)
lstm.load_state_dict(torch.load('/kaggle/input/finalgan/lstm_net (2).pth'))
#for param in lstm.parameters():
#    param.requires_grad = False
    
G = Generator().to(device)
G.load_state_dict(torch.load('/kaggle/input/finalgan/G.pth'))
for param in G.parameters():
    param.requires_grad = False
    
D = Discriminator().to(device)
D.load_state_dict(torch.load('/kaggle/input/finalgan/D.pth'))
for param in D.parameters():
    param.requires_grad = False
    
g_test_loss = 0
d_test_loss = 0
d_total = 0
g_total = 0
d_correct = 0
g_correct = 0
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = nn.BCELoss()
z_dim= 100

with torch.no_grad():
    for inputchunks, labelchunks in test_loader:

        for x in range(len(inputchunks)):

            inputs =  inputchunks[x].to(device)
            labels = labelchunks[x].to(device)

            batch_size = inputs.size(0)

            # Real data
            real_labels = torch.ones(batch_size, 1).uniform_(0.7, 1.0).to(device)
            real_data = inputs
            real_output = D(real_data, labels)
            real_loss = criterion(real_output, real_labels)

            # Fake data
            noise = torch.randn(batch_size, z_dim).to(device)
            fake_data = G(noise, labels)
            fake_labels = torch.zeros(batch_size, 1).uniform_(0.0, 0.3).to(device)
            fake_output = D(fake_data.detach(), labels)
            
            # Calculate Discriminator loss and accuracy
            fake_loss = criterion(fake_output, fake_labels)
            d_total += real_labels.size(0)
            d_correct += (torch.round(real_output) == torch.round(real_labels)).sum().item()
            d_total += labels.size(0)
            d_correct += (torch.round(fake_output) == torch.round(fake_labels)).sum().item()
            d_loss = (real_loss + fake_loss)/2

            flipped_labels = torch.ones(batch_size, 1).uniform_(0.7, 1.0).to(device)
            output = D(fake_data, labels)
            
            # calculate generator loss and accuracy
            g_loss = criterion(output, flipped_labels) 
            g_correct += (torch.round(output) == torch.round(flipped_labels)).sum().item()
            g_total += flipped_labels.size(0)
            g_test_loss += g_loss.item()
            d_test_loss += d_loss.item()

g_test_loss /= g_total
d_test_loss /= d_total
d_test_acc = d_correct/d_total
g_test_acc = g_correct / g_total

print("G Loss: ", g_test_loss, "G Acc: ", g_test_acc, "D Acc: ", d_test_acc, "D Loss: ", d_test_loss)
criterion = nn.CrossEntropyLoss()
learning_rate = 0.0001
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4) # Set weight decay here
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.3, patience=5, verbose=True)

RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [419]:
eps = 1e-8
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

torch.save(G.state_dict(), 'G.pth')
torch.save(D.state_dict(), 'D.pth')

train_gan(G, D, pre_trained_model, train_loader, val_loader, 100, z_dim, num_classes,n_mfcc, device)

Discriminator gradient norm: 7.182796636097675
Generator gradient norm: 4.571407222212066
[Epoch 0/100] [G loss 1.566662] [D loss 0.493915] [G V loss: 1.165299] [D Val Loss: 0.800600]
Discriminator gradient norm: 6.653870177186063
Generator gradient norm: 3.2879697611088776
Discriminator gradient norm: 7.830081908503177
Generator gradient norm: 5.454021837677077
[Epoch 2/100] [G loss 1.409869] [D loss 0.564905] [G V loss: 1.125888] [D Val Loss: 0.989090]
Discriminator gradient norm: 6.486670811013982
Generator gradient norm: 4.912459322460952
Discriminator gradient norm: 5.641413290569193
Generator gradient norm: 4.083375017333117
[Epoch 4/100] [G loss 1.009428] [D loss 0.626718] [G V loss: 0.709896] [D Val Loss: 0.626426]
Discriminator gradient norm: 3.661020443371123
Generator gradient norm: 2.9931117929871163
Discriminator gradient norm: 5.045995891315822
Generator gradient norm: 4.008169037808908
[Epoch 6/100] [G loss 0.785235] [D loss 0.644994] [G V loss: 0.954401] [D Val Loss: 0.

KeyboardInterrupt: 

In [None]:
def train_w_gan(model, train_loader, criterion, optimizer, device, generated_inputs):
    model.to(device)
    train_loss, train_total, train_correct = 0.0, 0, 0
    model.train()
    counter = 0
    for inputchunks, labelchunks in train_loader:

        for x in range(len(inputchunks)):
            
            inputs, labels = inputchunks[x].to(device), labelchunks[x].to(device)
            optimizer.zero_grad()
            
            if downsample:
                inputs = np.squeeze(inputs, axis=1)
                
            inputs = inputs.permute(0,2,1)
            batch_size = inputs.size(0)
    
            
            noise = torch.randn(batch_size, z_dim).to(device)
            generated_inputs = G(noise, labels)
            generated_inputs = generated_inputs.permute(0,2,1)

            
            outputs1 = model(inputs)
            loss = criterion(outputs1, labels)
            

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            train_loss += loss.item()
            train_total += labels.size(0)

            _, labelacc = labels.max(dim=1)
            predicted = outputs1.data.argmax(dim=1)
            train_correct += (predicted == labelacc).sum().item()
         
            outputs2 = model(generated_inputs)
            
            loss = criterion(outputs2, labels)
            train_loss += loss.item()

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
            counter += (inputs.size(0))
            train_total += labels.size(0)
            #_, labels = labels.max(dim=1)
            _, labelacc = labels.max(dim=1)
            predicted = outputs2.data.argmax(dim=1)

            train_correct += (predicted == labelacc).sum().item()
            
    train_loss /= (len(train_loader.dataset)  * len(inputchunks) * 2)
    train_acc = train_correct / (train_total)          
    return train_loss, train_acc

In [420]:
model = LSTMNet(input_size, hidden_size, num_layers, num_classes)

train_losses = []
train_accs = []
val_losses = []
val_accs = []
patience = 5
patience_counter = 0
criterion = nn.CrossEntropyLoss()
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4) # Set weight decay here
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.3, patience=5, verbose=True)

for epoch in range(num_epochs):
    # Training
    train_loss, train_acc = train_w_gan(model, train_loader, criterion, optimizer, device, G)

    # Validation
    val_loss, val_acc = validate(model, val_loader, criterion, device)

    # Save the model if the validation loss has decreased
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'lstm_net.pth')
        patience_counter = 0
    else:
        patience_counter += 1

    # Early stopping
    if patience_counter >= patience:
        print('Validation loss has not improved for the past', patience, 'epochs. Stopping early.')
    scheduler.step(val_loss)

    train_losses.append(train_loss)
    train_accs.append(train_acc)
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

test_loss, test_acc = validate(model, test_loader, criterion, device)

print("Test Acc: ", test_acc, "Test loss: ", test_loss)
