In [1]:

import torchaudio
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F

import os
import pandas as pd
import torchaudio
from torch.utils.data import Dataset, DataLoader, random_split
import torch
import librosa
import librosa.display
import matplotlib.pyplot as plt


In [2]:
labels_df = pd.read_excel('./data/data/NEW_IRB300012145_Patient_ID_deidentified.xlsx')
Smokeing_status = labels_df.iloc[:, 4].to_list()

Smokeing_status[149]

'never'

In [3]:
import torchaudio.transforms as T

class AmplitudeNormalization:
    def __call__(self, waveform):
        # Normalize the waveform to be within [-1, 1]
        peak = waveform.abs().max()
        if peak > 0:
            waveform = waveform / peak
        return waveform

# To use it:
# waveform, sample_rate = torchaudio.load('path/to/audio.wav')
# waveform = AmplitudeNormalization()(waveform)


class PadTrimAudio:
    def __init__(self, max_len):
        self.max_len = max_len

    def __call__(self, waveform):
        if waveform.size(1) > self.max_len:
            # Trim the waveform if longer than max_len
            waveform = waveform[:, :self.max_len]
        elif waveform.size(1) < self.max_len:
            # Pad with zeros if shorter than max_len
            padding_size = self.max_len - waveform.size(1)
            waveform = torch.nn.functional.pad(waveform, (0, padding_size), "constant", 0)

        if waveform.size(0) < 128:
            padding_size = 128 - waveform.size(0)
            waveform = torch.nn.functional.pad(waveform, (padding_size, 0), "constant", 0)
        return waveform

from sklearn.preprocessing import StandardScaler
import numpy as np

class FeatureNormalization:
    def __init__(self):
        self.scaler = StandardScaler()

    def fit(self, features):
        # Fit the scaler on the training set features
        self.scaler.fit(features)

    def transform(self, features):
        # Apply normalization to features
        return self.scaler.transform(features)

In [4]:
label_to_int = {
    'N': 0,
    'Y': 1
}

label_to_int['N']

0

In [5]:
label_to_int = {
    'F': 0,
    'M': 1
}


In [6]:
label_to_int = {
    'current': 0,
    'former': 0,
    'never': 1
}

In [7]:
class SoundDataset(Dataset):
    def __init__(self, data_dir, labels_df, transform=None, max_len=10000):
        self.data_dir = data_dir
        self.labels_df = labels_df
        self.transform = transform
        self.max_len = max_len
        # self.orderlist = ['RLP.wav', 'RUP.wav', 'RUA Hum.wav', 'LUA Hum.wav', 'RUA.wav', 'RMP.wav', 'LMP.wav', 'LUA.wav', 'LLP.wav', 'LUP.wav']
        self.orderlist = ['a.wav', 'a_high.wav', 'a_low.wav', 'consent.wav', 'cough_n.wav', 'e.wav', 'e_high.wav', 'e_low.wav', 'pain_rating.wav', 'SOB_rating.wav', 'two_roads.wav']


    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        patient_id = str(idx+1).zfill(3)
        # audio_dir = os.path.join(self.data_dir, str(patient_id), 'breath Eko')
        audio_dir = os.path.join(self.data_dir, str(patient_id), 'Voice')
        audio_file = [os.path.join(audio_dir, f) for f in os.listdir(audio_dir) if f.endswith('.wav') and '_' not in f][0]


        y, sr = librosa.load(audio_file, sr=None)


        D = librosa.stft(y, n_fft=2048, hop_length=512)  # n_fft and hop_length can be adjusted based on your needs
        S = np.abs(D) ** 2  # Convert to power spectrum
        mel_spec = librosa.feature.melspectrogram(S=S, sr=sr, n_mels=128)

            # print(mel_spec)
            # plt.figure(figsize=(10, 4))
            # librosa.display.specshow(librosa.power_to_db(mel_spec, ref=np.max), sr=sr, hop_length=512, x_axis='time', y_axis='mel')
            # plt.colorbar(format='%+2.0f dB')
            # plt.title('Mel spectrogram')
            # plt.tight_layout()
            # plt.show()
            # print(len(y))
            # print(sr)

            # if self.transform:
            #     waveform = self.transform(waveform)
        waveform = PadTrimAudio(self.max_len)(torch.tensor(mel_spec))


        label = label_to_int[self.labels_df[idx]]

        return waveform, label

transform = AmplitudeNormalization()
data_dir = './data/data/Patients'
sound_dataset = SoundDataset(data_dir, Smokeing_status, transform=transform)

train_dataset, test_dataset = random_split(sound_dataset, [180, 20])

traindataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)
testdataloader = DataLoader(test_dataset, batch_size=4, shuffle=False)

dataloader = DataLoader(sound_dataset, batch_size=20, shuffle=True)

In [8]:
for batch in traindataloader:
    waveforms, labels = batch
    print(waveforms.shape)
    print(labels)

torch.Size([4, 128, 10000])
tensor([1, 1, 0, 1])
torch.Size([4, 128, 10000])
tensor([0, 0, 0, 0])
torch.Size([4, 128, 10000])
tensor([1, 0, 1, 1])
torch.Size([4, 128, 10000])
tensor([1, 1, 0, 1])
torch.Size([4, 128, 10000])
tensor([1, 0, 1, 1])
torch.Size([4, 128, 10000])
tensor([1, 0, 1, 0])
torch.Size([4, 128, 10000])
tensor([1, 1, 1, 1])
torch.Size([4, 128, 10000])
tensor([0, 1, 1, 1])
torch.Size([4, 128, 10000])
tensor([1, 0, 0, 1])
torch.Size([4, 128, 10000])
tensor([1, 1, 1, 0])
torch.Size([4, 128, 10000])
tensor([1, 1, 1, 1])
torch.Size([4, 128, 10000])
tensor([0, 1, 0, 1])
torch.Size([4, 128, 10000])
tensor([0, 0, 0, 0])
torch.Size([4, 128, 10000])
tensor([1, 0, 1, 1])
torch.Size([4, 128, 10000])
tensor([1, 1, 0, 0])
torch.Size([4, 128, 10000])
tensor([0, 1, 1, 0])
torch.Size([4, 128, 10000])
tensor([1, 0, 1, 1])
torch.Size([4, 128, 10000])
tensor([1, 1, 1, 1])
torch.Size([4, 128, 10000])
tensor([1, 1, 1, 1])
torch.Size([4, 128, 10000])
tensor([0, 1, 1, 0])
torch.Size([4, 128, 

In [9]:
for batch in testdataloader:
    waveforms, labels = batch
    print(waveforms.shape)
    print(labels)

torch.Size([4, 128, 10000])
tensor([1, 1, 1, 1])
torch.Size([4, 128, 10000])
tensor([1, 1, 1, 0])
torch.Size([4, 128, 10000])
tensor([0, 0, 1, 1])
torch.Size([4, 128, 10000])
tensor([0, 1, 0, 0])
torch.Size([4, 128, 10000])
tensor([1, 1, 1, 1])


In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet18

def conv1d3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv1d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv1d3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm1d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv1d3x3(planes, planes)
        self.bn2 = nn.BatchNorm1d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class ResNet1D(nn.Module):

    def __init__(self, block, layers, num_classes=1000, zero_init_residual=False):
        super(ResNet1D, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Conv1d(128, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1d3x3(self.inplanes, planes * block.expansion, stride),
                nn.BatchNorm1d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

def AudioClassifier(**kwargs):
    return ResNet1D(BasicBlock, [2, 2, 2, 2], **kwargs)

# class AudioClassifier(nn.Module):
#     def __init__(self, num_classes=2):
#         super(AudioClassifier, self).__init__()
#         resnet = resnet18(pretrained=True)  # Load a pretrained ResNet18 model
#
#         # Modify the first convolutional layer to accept 1D input
#         self.resnet = nn.Sequential(
#             nn.Conv1d(128, 64, kernel_size=7, stride=2, padding=3, bias=False),
#             resnet.bn1,
#             resnet.relu,
#             resnet.maxpool,
#             resnet.layer1,
#             resnet.layer2,
#             resnet.layer3,
#             resnet.layer4,
#             nn.AdaptiveAvgPool1d(1)  # Adapting pool for 1D
#         )
#
#         # Replace the fully connected layer
#         self.fc = nn.Linear(512, num_classes)
#
#     def forward(self, x):
#         x = self.resnet(x)
#         x = torch.flatten(x, 1)
#         x = self.fc(x)
#         return x

class MLPClassifier(nn.Module):
    def __init__(self):
        super(MLPClassifier, self).__init__()
        # Flatten the input from 1280x1000 to 1280000
        self.fc1 = nn.Linear(1280 * 1000, 1024)  # First fully connected layer
        self.fc2 = nn.Linear(1024, 512)          # Second fully connected layer
        self.fc3 = nn.Linear(512, 3)             # Output layer for 3 classes

    def forward(self, x):
        # Flatten the input
        x = x.view(-1, 1280 * 1000)  # Ensure input tensor is reshaped to (batch_size, 1280*1000)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)  # No activation function here as it will be used with CrossEntropyLoss
        return x

In [17]:
import torch

def test_model(model, test_loader, device='cpu'):
    """
    Tests the given model on the provided test data loader.

    Parameters:
        model (torch.nn.Module): The model to test.
        test_loader (torch.utils.data.DataLoader): DataLoader for the test set.
        device (str): Device to run the model on ('cpu' or 'cuda').

    Returns:
        float: Accuracy of the model on the test set.
    """
    # Set the model to evaluation mode
    model.eval()
    model.to(device)

    correct = 0
    total = 0
    predictions = []
    true_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            print(labels)
            print(predicted)
            predictions.extend(predicted.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())


    accuracy = 100 * correct / total
    print(f'Accuracy of the model on the test set: {accuracy:.2f}%')


    return accuracy




In [18]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AudioClassifier().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

def train(model, device, train_loader, optimizer, criterion, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        correct = 0
        total = 0

        for data, targets in train_loader:
            data = data.to(device)
            targets = targets.to(device)

            optimizer.zero_grad()
            outputs = model(data)  # Add channel dimension
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # Compute accuracy
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
            # print(total)
        avg_loss = total_loss / len(train_loader)
        accuracy = 100 * correct / total
        print(f'Epoch {epoch+1}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
        accuracy = test_model(model, testdataloader, device)

train(model, device, traindataloader, optimizer, criterion)


Epoch 1, Loss: 1.3844, Accuracy: 50.56%
tensor([1, 1, 1, 1], device='cuda:0')
tensor([0, 0, 0, 0], device='cuda:0')
tensor([1, 1, 1, 0], device='cuda:0')
tensor([0, 0, 0, 0], device='cuda:0')
tensor([0, 0, 1, 1], device='cuda:0')
tensor([0, 0, 0, 0], device='cuda:0')
tensor([0, 1, 0, 0], device='cuda:0')
tensor([0, 0, 1, 0], device='cuda:0')
tensor([1, 1, 1, 1], device='cuda:0')
tensor([0, 0, 0, 0], device='cuda:0')
Accuracy of the model on the test set: 25.00%
Epoch 2, Loss: 0.8599, Accuracy: 53.33%
tensor([1, 1, 1, 1], device='cuda:0')
tensor([1, 1, 1, 1], device='cuda:0')
tensor([1, 1, 1, 0], device='cuda:0')
tensor([1, 1, 1, 1], device='cuda:0')
tensor([0, 0, 1, 1], device='cuda:0')
tensor([1, 1, 1, 1], device='cuda:0')
tensor([0, 1, 0, 0], device='cuda:0')
tensor([1, 1, 1, 1], device='cuda:0')
tensor([1, 1, 1, 1], device='cuda:0')
tensor([1, 1, 1, 1], device='cuda:0')
Accuracy of the model on the test set: 70.00%
Epoch 3, Loss: 0.7986, Accuracy: 52.22%
tensor([1, 1, 1, 1], device=