In [None]:
!pip install librosa
!pip install archisound

In [None]:
import numpy as np
from torch.utils.data import Dataset
from torch.utils.data import random_split, DataLoader
import pandas as pd
import os
import librosa
import random
import torch
from torch.utils.data import DataLoader
import torch
from torch import nn
import torch.optim.lr_scheduler as lr_scheduler

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
class RDSData(Dataset):
    def __init__(self, data_dir, csv_link, transforms=[]):
        self.data_dir = data_dir
        self.labels = pd.read_csv(csv_link, index_col=0)
        self.transforms = transforms
        self.files = sorted(os.listdir(self.data_dir))
    
    def __len__(self):
        return len(os.listdir(self.data_dir))
    
    def __getitem__(self, idx):
        file_name = self.files[idx]
        label = 0 if file_name[0] in ['x'] else 1
        if label == 1:
            ipd = int(file_name.split('_')[0])
            if ipd in self.labels['diagnosis']:
                if self.labels['diagnosis'][int(file_name.split('_')[0])] != 'RD':
                    label = 0
            else:
                print("RECORD not found", file_name)
        y, sr = librosa.load(self.data_dir+file_name, sr=48000, mono=True)
        if len(y) > 13*sr:
            start = random.randint(1, 7)*sr
            end = start + 5*sr
            y = y[start:end]
        else:
            y = np.pad(y, (5 * sr), mode='constant')
            y=y[:5*sr]

        beep_freq, duration = random.randint(750, 1000), random.random()
        amplitude = random.random()/2 + 0.25
        if random.random() < 0.5:
            t = np.linspace(0, duration, int(sr * duration))
    
            # Create beep signal
            beep = amplitude * np.sin(2 * np.pi * beep_freq * t)
            
            # Pad beep with zeros to match audio length
            if len(beep) < len(y):
                beep = np.pad(beep, (0, len(y) - len(beep)))
            else:
                beep = beep[:len(y)]
            y += beep
        if y.ndim == 1:  # If mono, convert to stereo
            y = np.stack((y, y), axis=-1)
        # print(y.shape)
        z = torch.from_numpy(y).float()
        
        return z, label

In [None]:
csv_link = "https://docs.google.com/spreadsheets/d/e/2PACX-1vQFKYn0AY8-7A3xS9KPQud5g3sc0OGjccuft_af3bfJ7ApLK4PxqO-srPaMl3ZAyg/pub?gid=508649410&single=true&output=csv"
data_dir = "/kaggle/input/rds-data/"

dataset = RDSData(data_dir, csv_link)
train_size = int(0.80 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

In [None]:
print(dataset[0])

In [None]:
train_dl = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dl = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [None]:
from archisound import ArchiSound

autoencoder = ArchiSound.from_pretrained("dmae1d-ATC64-v2")

In [None]:
class Classifier(nn.Module):
    def __init__(self, in_size):
        super(Classifier, self).__init__()
        self.Custom = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_size, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )
    def forward(self, x):
        x = self.Custom(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
input_size = 7520
model = Classifier(input_size)
model.to(device)
autoencoder = autoencoder.to(device)

In [None]:
print(sum(p.numel() for p in model.parameters()))
print(model)

In [None]:
class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        
        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)
        
        intersection = (inputs * targets).sum()                            
        dice = (2*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)  
        
        return 1 - dice

In [None]:
Dice=DiceLoss()
BCE = nn.BCELoss()
def criterion(x, y):
    a = Dice(x, y)
    b = BCE(x, y)
    return 0.4*a + 0.6*b

optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)
scheduler = lr_scheduler.LinearLR(optimizer, start_factor=1, end_factor=0.000001, total_iters=15)

epochs = 15

best = 0
best_model = None
for epoch in range(epochs):
    total_loss = 0
    correct_predictions = 0
    total_samples = 0

    for batch_data, batch_labels in train_dl:
        batch_data = batch_data.to(device)
        batch_labels = batch_labels.to(device).float().unsqueeze(1)  # Convert labels to float and shape [batch_size, 1]

        # Encode the audio using the autoencoder
        batch_data = batch_data.permute(0, 2, 1)
        encoded_data = autoencoder.encode(batch_data)
        print(encoded_data.shape)

        optimizer.zero_grad()
        outputs = model(encoded_data)
        loss = criterion(outputs, batch_labels)
        
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # Convert outputs to binary predictions (0 or 1)
        predicted = (outputs > 0.5).float()
        correct_predictions += (predicted == batch_labels).sum().item()
        total_samples += batch_labels.size(0)

    # Calculate average loss and accuracy for the epoch
    scheduler.step()
    avg_loss = total_loss / len(train_dl)
    accuracy = correct_predictions / total_samples
    if accuracy >= best:
        best = accuracy
        best_model = model
    print("---------------------------------------\n")
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")
    print("\n---------------------------------------")

In [None]:
# prompt: test the model and print accuracy, precision and recall

# Evaluation loop
true_labels = []
predicted_labels = []
model = best_model

with torch.no_grad():
    for batch_data, batch_labels in test_dl:
        batch_data = batch_data.to(device)
        batch_labels = batch_labels.to(device).float().unsqueeze(1)

        batch_data = batch_data.permute(0, 2, 1)
        encoded_data = autoencoder.encode(batch_data)
        outputs = model(encoded_data)
        x = list(map(float, outputs.cpu().numpy()))
        for i in range(len(x)):
            print(int(x[i]>0.5), list(batch_labels[i].cpu().numpy()))
        predicted = (outputs > 0.5).float()

        true_labels.extend(batch_labels.cpu().numpy().flatten())
        predicted_labels.extend(predicted.cpu().numpy().flatten())


from sklearn.metrics import accuracy_score, precision_score, recall_score

accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels)
recall = recall_score(true_labels, predicted_labels)
fscore = 2 * (precision * recall) / (precision + recall)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}") 
print(f"F-score: {fscore:.4f}")

In [None]:
torch.save(best_model.state_dict(), "/kaggle/working/model.pt")