In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import pandas as pd
import numpy as np
import torchaudio
import torch
from transformers import ClapModel, ClapProcessor
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
model = ClapModel.from_pretrained("laion/clap-htsat-unfused").to(device)
processor = ClapProcessor.from_pretrained("laion/clap-htsat-unfused")

In [None]:
model.eval()

In [None]:
diagnosis_df = pd.read_csv('/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database/patient_diagnosis.csv', header=None, names=['patient_id', 'diagnosis'])

In [None]:
diagnosis_df

In [None]:
column_names = [
    'Patient number',
    'Age',
    'Sex',
    'Adult BMI (kg/m2)',
    'Child Weight (kg)',
    'Child Height (cm)'
]
demo_path = "/kaggle/input/respiratory-sound-database/demographic_info.txt"
demographics_df = pd.read_csv(demo_path, sep='\s+',header=None, names=column_names,)

In [None]:
demographics_df

In [None]:
def generate_patient_description(row):
    """Generate a human-readable description of a patient with NaN handling"""
    description = f"Patient {row['Patient number']} is a "
    
    # Age description with NaN check
    if pd.isna(row['Age']):
        description += "patient with unknown age"
    else:
        age = float(row['Age'])
        if age < 1:
            description += f"{age*12:.1f}-month-old"
        elif age < 18:
            description += f"{int(age)}-year-old"
        else:
            description += f"{int(age)}-year-old adult"
    
    # Sex description with NaN check
    if pd.isna(row['Sex']):
        description += " of unspecified sex."
    else:
        description += f" {row['Sex']}."
    
    # Adult BMI or child measurements with NaN checks
    if pd.notna(row['Adult BMI (kg/m2)']):
        description += f" The adult has a BMI of {row['Adult BMI (kg/m2)']} kg/m²."
    else:
        child_info = []
        if pd.notna(row['Child Weight (kg)']):
            child_info.append(f"weighs {row['Child Weight (kg)']} kg")
        if pd.notna(row['Child Height (cm)']):
            child_info.append(f"is {row['Child Height (cm)']} cm tall")
        
        if child_info:
            description += " The child " + " and ".join(child_info) + "."
    
    return description

# Load the data with proper NA handling
column_names = [
    'Patient number',
    'Age',
    'Sex',
    'Adult BMI (kg/m2)',
    'Child Weight (kg)',
    'Child Height (cm)'
]
demo_path = "/kaggle/input/respiratory-sound-database/demographic_info.txt"
df = pd.read_csv(demo_path, sep='\s+',header=None, names=column_names,)

# Generate descriptions for all patients
df['Description'] = df.apply(generate_patient_description, axis=1)

# Print all descriptions
print("Complete Patient Descriptions:")
df.head()

In [None]:
# 2. Load Official Split
with open('/kaggle/input/others/official_split.txt') as f:
    official_split = f.readlines()

In [None]:
train_samples = [line.strip().split('\t')[0] for line in official_split if "train" in line]
test_samples = [line.strip().split('\t')[0] for line in official_split if "test" in line]

In [None]:
len(train_samples)

In [None]:
# 3. Load Annotation Files
def load_annotation(annotation_file):
    with open(annotation_file, 'r') as file:
        lines = file.readlines()
        cycle_data = []
        for line in lines:
            parts = line.split('\t')  # Assuming tab-separated columns
            cycle_data.append({
                'start_time': float(parts[0]),
                'end_time': float(parts[1]),
                'crackles': int(parts[2]),
                'wheezes': int(parts[3]),
            })
    return cycle_data

# 4. Load Audio Files
def load_audio(audio_path):
    waveform, sample_rate = torchaudio.load(audio_path)
    return waveform, sample_rate

# 5. Extract Relevant Audio Segments
def extract_breathing_segment(waveform, sample_rate, start_time, end_time):
    start_sample = int(start_time * sample_rate)
    end_sample = int(end_time * sample_rate)
    return waveform[:, start_sample:end_sample]
    
def get_label(crackles, wheezes):
    if crackles == 1 and wheezes == 1:
        return "both"
    elif crackles == 1:
        return "crackles"
    elif wheezes == 1:
        return "wheezes"
    else:
        return "normal"

In [None]:
# Gather embeddings and labels
audio_embeddings = []
text_embeddings = []
labels = []
target_sample_rate = 48000

data_dir = '/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database/audio_and_txt_files'

In [None]:
from tqdm import tqdm
import os
audio_embeddings = []
text_embeddings = []
labels = []
for file_id in tqdm(train_samples, desc="Processing AKGC417L samples"):
    if "AKGC417L" not in file_id:
        continue  # Skip non-microphone recordings
    audio_path = os.path.join(data_dir, file_id + '.wav')
    annotation_path = os.path.join(data_dir, file_id + '.txt')
    try:
        description = (df[df["Patient number"] == int(file_id[:3])]["Description"]).values[0]
        text_inputs = processor(text=description, return_tensors="pt", padding=True, truncation=True)
        text_inputs = {k: v.to(device) for k, v in text_inputs.items()}
        with torch.no_grad():
            text_embed = model.get_text_features(**text_inputs)
    except Exception as e:
        print(f"[Text Error] {file_id}: {e}")
        continue

    try:
        waveform, sample_rate = load_audio(audio_path)
        annotation = load_annotation(annotation_path)
    except Exception as e:
        print(f"[Audio/Annotation Error] {file_id}: {e}")
        continue

    for cycle in annotation:
        try:
            segment = extract_breathing_segment(waveform, sample_rate, cycle['start_time'], cycle['end_time'])
            segment = segment.squeeze(0)

            if sample_rate != target_sample_rate:
                resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sample_rate)
                segment = resampler(segment)

            audio_inputs = processor(audios=segment, sampling_rate=target_sample_rate, return_tensors="pt")
            audio_inputs = {k: v.to(device) for k, v in audio_inputs.items()}
            with torch.no_grad():
                audio_embed = model.get_audio_features(**audio_inputs)

            if get_label(cycle['crackles'], cycle['wheezes']) not in ["wheezes","crackles","normal"]:
                continue
            else :
                labels.append(get_label(cycle['crackles'], cycle['wheezes']))
                audio_embeddings.append(audio_embed.squeeze(0))
                text_embeddings.append(text_embed.squeeze(0))
        except Exception as e:
            print(f"[Cycle Error] {file_id}: {e}")

In [None]:
# Convert lists to tensors
audio_tensor = torch.stack(audio_embeddings)
text_tensor = torch.stack(text_embeddings)

In [None]:
# 1. Combine embeddings and prepare data
multimodal_embeddings = torch.cat([audio_tensor, text_tensor], dim=1)

# Ensure we're working with float32 tensors
multimodal_embeddings = multimodal_embeddings.float()

# 2. Prepare labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(labels)  # Convert string labels to numerical
num_classes = len(label_encoder.classes_)

# Convert to tensors - ensure proper types and devices
X_tensor = multimodal_embeddings.cpu()  # Features on CPU
y_tensor = torch.from_numpy(y_encoded).long()  # Labels as int64

# 3. Create dataset and dataloaders
dataset = TensorDataset(X_tensor, y_tensor)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_set, val_set = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_set, batch_size=32, shuffle=True, pin_memory=True)
val_loader = DataLoader(val_set, batch_size=32, shuffle=False, pin_memory=True)

# 4. Model definition
class MultimodalClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64,num_classes)
            
        )
    
    def forward(self, x):
        return self.fc(x)

# Initialize model
input_dim = multimodal_embeddings.shape[1]
model_classifier = MultimodalClassifier(input_dim, num_classes).to(device)

# 5. Training setup
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model_classifier.parameters(), lr=1e-3, weight_decay=1e-3)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

In [None]:
# 6. Training loop with proper device management
best_val_acc = 0
for epoch in range(100):  # Or your desired number of epochs
    model_classifier.train()
    train_loss = 0
    
    for batch in train_loader:
        inputs, targets = batch
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model_classifier(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    scheduler.step()
    
    # Validation
    model_classifier.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in val_loader:
            inputs, targets = batch
            inputs, targets = inputs.to(device), targets.to(device)
            
            outputs = model_classifier(inputs)
            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    
    val_acc = correct / total
    print(f'Epoch {epoch+1}: Train Loss: {train_loss/len(train_loader):.4f}, Val Acc: {val_acc:.4f}')
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model_classifier.state_dict(), 'best_multimodal_model.pth')

print("Training complete!")
print(f"Best validation accuracy: {best_val_acc:.4f}")

In [None]:
from tqdm import tqdm
import os

test_audio_embeddings = []
test_text_embeddings = []
test_labels = []
target_sample_rate = 48000

data_dir = '/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database/audio_and_txt_files'

for file_id in tqdm(test_samples, desc="Processing test samples"):
    audio_path = os.path.join(data_dir, file_id + '.wav')
    annotation_path = os.path.join(data_dir, file_id + '.txt')

    try:
        description = (df[df["Patient number"] == int(file_id[:3])]["Description"]).values[0]
        text_inputs = processor(text=description, return_tensors="pt", padding=True, truncation=True)
        text_inputs = {k: v.to(device) for k, v in text_inputs.items()}
        with torch.no_grad():
            text_embed = model.get_text_features(**text_inputs)
    except Exception as e:
        print(f"[Text Error] {file_id}: {e}")
        continue

    try:
        waveform, sample_rate = load_audio(audio_path)
        annotation = load_annotation(annotation_path)
    except Exception as e:
        print(f"[Audio/Annotation Error] {file_id}: {e}")
        continue

    for cycle in annotation:
        try:
            segment = extract_breathing_segment(waveform, sample_rate, cycle['start_time'], cycle['end_time'])
            segment = segment.squeeze(0)

            if sample_rate != target_sample_rate:
                resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sample_rate)
                segment = resampler(segment)

            # Amplitude normalization
            if segment.abs().max() > 0:
                segment = segment / segment.abs().max()

            audio_inputs = processor(audios=segment, sampling_rate=target_sample_rate, return_tensors="pt")
            audio_inputs = {k: v.to(device) for k, v in audio_inputs.items()}
            with torch.no_grad():
                audio_embed = model.get_audio_features(**audio_inputs)

            label = get_label(cycle['crackles'], cycle['wheezes'])
            if label == "both":
                continue

            test_audio_embeddings.append(audio_embed.squeeze(0))
            test_text_embeddings.append(text_embed.squeeze(0))
            test_labels.append(label)

        except Exception as e:
            print(f"[Cycle Error] {file_id}: {e}")


In [None]:
# TEST PROCESS (aligned with training)
# 1. Combine test embeddings (same as training)
test_audio_tensor = torch.stack(test_audio_embeddings).float()  # Already on CPU
test_text_tensor = torch.stack(test_text_embeddings).float()    # Already on CPU
test_multimodal = torch.cat([test_audio_tensor, test_text_tensor], dim=1).float()  # [n_samples, 1024]

# 2. Prepare test labels (using the SAME label encoder from training)
test_y_encoded = label_encoder.transform(test_labels)  # Don't use fit_transform() here!
test_y_tensor = torch.from_numpy(test_y_encoded).long()

# 3. Create test dataset (on CPU)
test_dataset = TensorDataset(test_multimodal.cpu(), test_y_tensor.cpu())

# 4. Create test loader (same params as validation loader)
test_loader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,  # Never shuffle test data
    pin_memory=True  # Only works with CPU tensors
)

# 5. Evaluation loop (same as validation)
model_classifier.eval()
test_correct = 0
test_total = 0
all_preds = []
all_targets = []

with torch.no_grad():
    for inputs, targets in test_loader:
        # Move to device (this is where pin_memory helps)
        inputs, targets = inputs.to(device), targets.to(device)
        
        # Forward pass
        outputs = model_classifier(inputs)
        _, predicted = torch.max(outputs, 1)
        
        # Update metrics
        test_total += targets.size(0)
        test_correct += (predicted == targets).sum().item()
        
        # Store for detailed metrics
        all_preds.extend(predicted.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())

# 6. Calculate metrics (same as training)
test_acc = test_correct / test_total
print(f'\nTest Accuracy: {test_acc:.4f}')

In [None]:
# Detailed classification report
from sklearn.metrics import classification_report
print(classification_report(
    all_targets,
    all_preds,
    target_names=label_encoder.classes_
))

# Confusion matrix
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(all_targets, all_preds)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', 
            xticklabels=label_encoder.classes_,
            yticklabels=label_encoder.classes_)
plt.title('Test Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

In [None]:
test_multimodal.shape

In [None]:
multimodal_embeddings.shape

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np

# Example confusion matrix (replace with your actual matrix)
cm = confusion_matrix(all_targets, all_preds)
classes = label_encoder.classes_

# Initialize dictionaries to store metrics
sensitivity = {}
specificity = {}

for i, class_name in enumerate(classes):
    # True Positives (TP)
    TP = cm[i, i]
    
    # False Negatives (FN)
    FN = cm[i, :].sum() - TP
    
    # False Positives (FP)
    FP = cm[:, i].sum() - TP
    
    # True Negatives (TN)
    TN = cm.sum() - (TP + FP + FN)
    
    # Sensitivity (Recall)
    sensitivity[class_name] = TP / (TP + FN) if (TP + FN) != 0 else 0
    
    # Specificity
    specificity[class_name] = TN / (TN + FP) if (TN + FP) != 0 else 0

# Print results
print("Sensitivity (Se) per class:")
e=0
for class_name, se in sensitivity.items():
    e+=se
    print(f"{class_name}: {se:.3f}")
print(f"Avg: {e/4:.3f}")
print("\nSpecificity (Sp) per class:")
p=0
for class_name, sp in specificity.items():
    p+=sp
    print(f"{class_name}: {sp:.3f}")
print(f"Avg: {p/4:.3f}")
print(f"\nScore: {(e+p)/8}")