# Lung Disease Prediction using Deep Learning
Implementation of an AI-driven early detection system for respiratory diseases using lung sound analysis. This notebook implements a CNN-RNN fusion model for classifying different respiratory conditions.

In [None]:
# Import required libraries
import os
import numpy as np
import pandas as pd
import librosa
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Tuple

## Data Loading and Preprocessing
First, we'll implement the data preprocessing steps including:
1. Loading the ICBHI dataset
2. Applying high-pass filtering
3. Segmenting recordings into 2.5s frames
4. Converting to Mel-spectrograms

In [None]:
class DataLoader:
    def __init__(self, data_dir: str, sr: int = 22050):
        """Initialize DataLoader
        Args:
            data_dir: Directory containing the dataset
            sr: Sampling rate for audio processing
        """
        self.data_dir = data_dir
        self.sr = sr
        self.scaler = StandardScaler()
    
    def load_icbhi_dataset(self) -> Tuple[List[np.ndarray], List[str]]:
        """Load ICBHI dataset
        Returns:
            Tuple of (audio_data, labels)
        """
        audio_data = []
        labels = []
        
        # Load and process audio files
        for file in os.listdir(self.data_dir):
            if file.endswith('.wav'):
                file_path = os.path.join(self.data_dir, file)
                # Load and preprocess audio
                audio = self.preprocess_audio(file_path)
                audio_data.append(audio)
                
                # Extract label from filename or annotation file
                label = self._get_label(file)
                labels.append(label)
        
        return audio_data, labels
    
    def preprocess_audio(self, file_path: str) -> np.ndarray:
        """Preprocess audio file
        Args:
            file_path: Path to audio file
        Returns:
            Preprocessed audio signal
        """
        # Load audio
        y, _ = librosa.load(file_path, sr=self.sr)
        
        # Apply high-pass filter
        y_filtered = librosa.effects.high_pass_filter(y, sr=self.sr, cutoff=20)
        
        # Normalize
        y_normalized = self.scaler.fit_transform(y_filtered.reshape(-1, 1)).ravel()
        
        return y_normalized
    
    def segment_audio(self, y: np.ndarray, segment_length: float = 2.5, 
                      overlap: float = 0.5) -> List[np.ndarray]:
        """Segment audio into overlapping frames
        Args:
            y: Audio signal
            segment_length: Length of each segment in seconds
            overlap: Overlap between segments (0-1)
        Returns:
            List of segments
        """
        samples_per_segment = int(segment_length * self.sr)
        hop_length = int(samples_per_segment * (1 - overlap))
        
        segments = []
        for i in range(0, len(y) - samples_per_segment + 1, hop_length):
            segment = y[i:i + samples_per_segment]
            segments.append(segment)
        
        return segments
    
    def extract_features(self, y: np.ndarray) -> np.ndarray:
        """Extract features from audio signal
        Args:
            y: Audio signal
        Returns:
            Feature matrix
        """
        # Create mel-spectrogram
        mel_spect = librosa.feature.melspectrogram(
            y=y,
            sr=self.sr,
            n_mels=128,
            fmax=self.sr/2
        )
        
        # Convert to log scale
        mel_spect_db = librosa.power_to_db(mel_spect, ref=np.max)
        
        # Extract MFCCs
        mfccs = librosa.feature.mfcc(y=y, sr=self.sr, n_mfcc=13)
        
        # Extract zero crossing rate
        zcr = librosa.feature.zero_crossing_rate(y)
        
        # Combine features
        features = np.vstack([mel_spect_db, mfccs, zcr])
        
        return features
    
    def _get_label(self, filename: str) -> str:
        """Extract label from filename or annotation file
        Args:
            filename: Audio filename
        Returns:
            Label string
        """
        # Implement label extraction logic based on dataset structure
        # This is a placeholder - modify according to actual dataset
        if 'crackle' in filename.lower():
            return 'pneumonia'
        elif 'wheeze' in filename.lower():
            return 'asthma'
        else:
            return 'normal'

## Model Architecture
Implementation of the CNN-RNN fusion model for lung sound classification

In [None]:
class LungSoundModel:
    def __init__(self, input_shape: Tuple[int, int, int]):
        """Initialize the CNN-RNN fusion model
        Args:
            input_shape: Shape of input data (time_steps, features, channels)
        """
        self.input_shape = input_shape
        self.model = self._build_model()
    
    def _build_model(self) -> models.Model:
        """Build the CNN-RNN fusion model architecture
        Returns:
            Compiled Keras model
        """
        # Input layer
        inputs = layers.Input(shape=self.input_shape)
        
        # CNN layers
        x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
        x = layers.MaxPooling2D((2, 2))(x)
        x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
        x = layers.MaxPooling2D((2, 2))(x)
        x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
        x = layers.MaxPooling2D((2, 2))(x)
        
        # Reshape for RNN
        x = layers.Reshape((-1, x.shape[-1] * x.shape[-2]))(x)
        
        # RNN layers
        x = layers.LSTM(128, return_sequences=True)(x)
        x = layers.LSTM(64)(x)
        
        # Dense layers
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dropout(0.5)(x)
        x = layers.Dense(64, activation='relu')(x)
        
        # Output layer
        outputs = layers.Dense(4, activation='softmax')(x)  # 4 classes: normal, pneumonia, asthma, COPD
        
        # Create model
        model = models.Model(inputs=inputs, outputs=outputs)
        
        # Compile model
        model.compile(
            optimizer='adam',
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def train(self, x_train, y_train, x_val, y_val, epochs=50, batch_size=32):
        """Train the model
        Args:
            x_train: Training data
            y_train: Training labels
            x_val: Validation data
            y_val: Validation labels
            epochs: Number of training epochs
            batch_size: Batch size for training
        Returns:
            Training history
        """
        history = self.model.fit(
            x_train,
            y_train,
            validation_data=(x_val, y_val),
            epochs=epochs,
            batch_size=batch_size
        )
        
        return history
    
    def predict(self, x):
        """Make predictions
        Args:
            x: Input data
        Returns:
            Predicted probabilities
        """
        return self.model.predict(x)
    
    def evaluate(self, x_test, y_test):
        """Evaluate model performance
        Args:
            x_test: Test data
            y_test: Test labels
        Returns:
            Test loss and metrics
        """
        return self.model.evaluate(x_test, y_test)

## Training Pipeline
Set up the training pipeline and train the model

In [None]:
# Initialize data loader
data_loader = DataLoader('path/to/dataset')

# Load and preprocess data
audio_data, labels = data_loader.load_icbhi_dataset()

# Process each audio file
processed_data = []
for audio in audio_data:
    # Segment audio
    segments = data_loader.segment_audio(audio)
    
    # Extract features for each segment
    segment_features = [data_loader.extract_features(segment) for segment in segments]
    processed_data.extend(segment_features)

# Convert to numpy arrays
X = np.array(processed_data)
y = np.array(labels)

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

# Initialize and train model
model = LungSoundModel(input_shape=X_train.shape[1:])
history = model.train(X_train, y_train, X_val, y_val)

## Model Evaluation
Evaluate model performance and visualize results

In [None]:
# Evaluate model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f'Test accuracy: {test_accuracy:.4f}')

# Plot training history
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()