In [None]:
!pip install librosa matplotlib numpy pillow
!pip uninstall pyarrow -y
!pip install --upgrade pyarrow datasets
!pip install datasets
!pip install evaluate
!pip install accelerate -U

In [None]:
import os
import librosa
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageEnhance, ImageOps
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict
import random

In [None]:
# Define paths for the dataset
path_to_data = '/kaggle/input/spectrograms-data/Spectrograms'
hc_folder = os.path.join(path_to_data, 'HC_AH')  # Healthy audio samples
pd_folder = os.path.join(path_to_data, 'PD_AH')  # Parkinson's audio samples

# Paths for saving spectrograms in Google Drive
spectrogram_hc_folder = '/kaggle/input/spectrograms-data/Spectrograms/healthy'
spectrogram_pd_folder = '/kaggle/input/spectrograms-data/Spectrograms/parkinson'

# Create directories if they do not exist
os.makedirs(spectrogram_hc_folder, exist_ok=True)
os.makedirs(spectrogram_pd_folder, exist_ok=True)

In [None]:
def create_spectrogram(audio_path, save_folder, file_name, chunk_size=0.05, sample_rate=22050, limit = 0):
    # Load the audio file
    y, sr = librosa.load(audio_path, sr=sample_rate)

    # Divide into chunks (0.1 seconds)
    chunk_length = int(chunk_size * sr)  # Convert chunk size (in seconds) to samples
    total_chunks = len(y) // chunk_length

    if limit == 0:
        limit = total_chunks

    for i in range(limit):
        # Get the chunk of audio
        chunk = y[i * chunk_length: (i + 1) * chunk_length]

        # Create a spectrogram using STFT
        S = librosa.feature.melspectrogram(y=chunk, sr=sr, n_mels=64,n_fft=256, hop_length=64)
        S_DB = librosa.power_to_db(S, ref=np.max)

        # Plot the spectrogram and save it as an image
        plt.figure(figsize=(2, 2))
        plt.axis('off')  # Remove axes

        librosa.display.specshow(S_DB, sr=sr, cmap='viridis')
        save_path = os.path.join(save_folder, f'{file_name}_chunk_{i}.png')
        plt.savefig(save_path, bbox_inches='tight', pad_inches=0)
        plt.close()

In [None]:
def create_spectrogram_augmented(y, sr, chunk_size=0.1, sample_rate=22050, limit=0):
    """
    Create spectrograms from audio chunks and return them as 224x224 RGB images.

    Args:
        y (np.ndarray): Audio time-series.
        sr (int): Sampling rate of the audio.
        chunk_size (float): Length of each chunk in seconds.
        sample_rate (int): Target sampling rate for processing.
        limit (int): Maximum number of chunks to process. If 0, process all chunks.

    Returns:
        List[Image]: List of PIL Image objects containing spectrograms.
    """
    chunk_length = int(chunk_size * sr)  # Convert chunk size (in seconds) to samples
    total_chunks = len(y) // chunk_length

    if limit == 0:
        limit = total_chunks
    else:
        limit = min(limit, total_chunks)

    spectrogram_images = []

    for i in range(limit):
        # Get the chunk of audio
        chunk = y[i * chunk_length: (i + 1) * chunk_length]

        # Create a spectrogram using STFT
        S = librosa.feature.melspectrogram(y=chunk, sr=sr, n_mels=64, n_fft=256, hop_length=64)
        S_DB = librosa.power_to_db(S, ref=np.max)

        # Plot the spectrogram
        fig, ax = plt.subplots(figsize=(3.2, 3.2), dpi=72)  # 3.2 * 72 = 224 pixels
        ax.axis('off')  # Remove axes

        # librosa.display.specshow(S_DB, sr=sr, cmap='viridis', ax=ax)

        # Convert plot to image
        fig.canvas.draw()
        img = np.array(fig.canvas.renderer.buffer_rgba())  # Get RGBA image

        # Convert to PIL Image and ensure RGB format
        img_pil = Image.fromarray(img).convert('RGB')
        img_pil = img_pil.resize((224, 224), Image.LANCZOS)  # Ensure exact size

        plt.close(fig)  # Close the figure to free memory

        spectrogram_images.append(img_pil)

    return spectrogram_images

In [None]:
# Function to check if folder is empty
def is_folder_empty(folder_path):
    # Check if the folder exists and is non-empty
    return len(os.listdir(folder_path)) == 0

# Loop through folders and create spectrograms if the folder is empty
for folder, label, save_folder in zip([hc_folder, pd_folder], ['healthy', 'parkinson'], [spectrogram_hc_folder, spectrogram_pd_folder]):
    if is_folder_empty(save_folder):
        print(f"Generating spectrograms for {label} data...")
        for file in os.listdir(folder):
            if file.endswith('.wav'):  # Ensure it's an audio file
                file_path = os.path.join(folder, file)
                create_spectrogram(file_path, save_folder, file_name=os.path.splitext(file)[0])
    else:
        print(f"Spectrogram folder for {label} data already exists and is not empty. Skipping generation.")

In [None]:
# Function to apply random augmentation to an image
def apply_random_augmentation(img):
    # Random rotation
    if random.random() < 0.5:
        img = img.rotate(random.choice([0, 90, 180, 270]))

    # Random horizontal flip
    if random.random() < 0.5:
        img = ImageOps.mirror(img)

    # Random brightness adjustment
    if random.random() < 0.5:
        enhancer = ImageEnhance.Brightness(img)
        img = enhancer.enhance(random.uniform(0.8, 1.2))

    return img

In [None]:
def apply_time_domain_augmentation(audio_path, shift_max=0.2, stretch_factor=1.2):
    """
    Apply time-domain augmentations (time shifting and time stretching) to an audio sample.

    Args:
        audio_path (str): Path to the input audio file.
        output_path (str): Path to save the augmented audio.
        shift_max (float): Maximum fraction of the total duration to shift (e.g., 0.2 for 20%).
        stretch_factor (float): Factor by which to stretch the time (e.g., 1.2 to increase speed by 20%).

    Returns:
        None
    """
    # Load the audio file
    y, sr = librosa.load(audio_path, sr=None)

    # Apply time shifting
    shift_samples = int(shift_max * len(y))  # Number of samples to shift
    shift = np.random.randint(-shift_samples, shift_samples)
    y_shifted = np.roll(y, shift)

    # Apply time stretching
    y_stretched = librosa.effects.time_stretch(y_shifted, rate=stretch_factor)

    # Ensure the stretched audio matches the original length by trimming or padding
    if len(y_stretched) > len(y):
        y_stretched = y_stretched[:len(y)]
    else:
        y_stretched = np.pad(y_stretched, (0, len(y) - len(y_stretched)))

    return y_stretched

In [None]:
# Count initial number of images without setting target_count
def load_initial_images(spectrogram_folder, label):
    images = []
    labels = []
    for file in os.listdir(spectrogram_folder):
        if file.endswith('.png'):
            image_path = os.path.join(spectrogram_folder, file)
            img = Image.open(image_path).convert('RGB')  # Convert to RGB
            img = img.resize((224, 224))  # Resize the image to 224x224
            images.append(np.array(img))
            labels.append(label)
    return images, labels

# Load initial datasets without augmentation to determine counts
healthy_images, healthy_labels = load_initial_images(spectrogram_hc_folder, 'healthy')
parkinson_images, parkinson_labels = load_initial_images(spectrogram_pd_folder, 'parkinson')

# Set target_count as the maximum count between the two categories
target_count = min(len(healthy_images), len(parkinson_images))

# Reload with augmentation to ensure balanced dataset
def load_dataset_with_limit(audio_folder, spectrogram_folder, label, target_count):
    images, labels = load_initial_images(spectrogram_folder, label)
    # Randomly select target_count samples
    selected_indices = random.sample(range(len(images)), target_count)
    images = [images[i] for i in selected_indices]
    labels = [labels[i] for i in selected_indices]

    return images, labels

print("Count : "+str(target_count))

In [None]:
# Use data augmentation to balance both categories to target_count
healthy_images, healthy_labels = load_dataset_with_limit(hc_folder,spectrogram_hc_folder, 'healthy', target_count)
print(len(healthy_images))
parkinson_images, parkinson_labels = load_dataset_with_limit(pd_folder,spectrogram_pd_folder, 'parkinson', target_count)
print(len(parkinson_images))

# Combine datasets
images = healthy_images + parkinson_images
labels = healthy_labels + parkinson_labels

# Split the data into train and test sets
train_images, test_images, train_labels, test_labels = train_test_split(images, labels, test_size=0.2, random_state=42)

# Create Hugging Face dataset structure
train_data = {"image": train_images, "label": train_labels}
test_data = {"image": test_images, "label": test_labels}

# Convert to Hugging Face Dataset
train_dataset = Dataset.from_dict(train_data)
test_dataset = Dataset.from_dict(test_data)

# Combine into a DatasetDict
dataset = DatasetDict({
    'train': train_dataset,
    'validation': test_dataset
})

print(dataset)

In [None]:
import requests
import torch
from PIL import Image
from transformers import ViTImageProcessor, ViTForImageClassification, TrainingArguments, Trainer
from tqdm import tqdm
from evaluate import load
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import accelerate
import transformers

device = "cpu"

model_name = "google/vit-base-patch16-224"                        # the model name
image_processor = ViTImageProcessor.from_pretrained(model_name)   # load the image processor
modelViT = ViTForImageClassification.from_pretrained(model_name)     # loading the pre-trained model

labels = ['healthy', 'parkinson']

label_map = {'healthy': 0, 'parkinson': 1}

In [None]:
def collate_fn(batch):
    # Split the batch into images (X) and labels (y)
    images = [item['image'] for item in batch]  # First element in each item is the image
    labels = [item['label'] for item in batch]  # Second element in each item is the label

    # Convert images to tensors
    images_tensor = torch.stack([torch.tensor(img, dtype=torch.float32).permute(2, 0, 1) for img in images])  # Convert images to tensors and change dimension order

    # Convert labels from strings ('healthy', 'parkinson') to integers (0, 1)
    labels_tensor = torch.tensor([label_map[label] for label in labels], dtype=torch.long)

    return {
        'pixel_values': images_tensor,  # Image tensor
        'labels': labels_tensor         # Label tensor (numerical)
    }

In [None]:
# Load the accuracy and f1 metrics from the evaluate module
accuracy = load("accuracy")
f1 = load("f1")

def compute_metrics(eval_pred):
    # Predictions and true labels
    predictions = eval_pred.predictions
    references = eval_pred.label_ids

    # Compute accuracy and F1 scores
    accuracy_score = accuracy.compute(predictions=np.argmax(predictions, axis=1), references=references)
    f1_score = f1.compute(predictions=np.argmax(predictions, axis=1), references=references, average="macro")

    # Compute AUC (for multiclass classification, use 'ovo' or 'ovr')
    try:
        auc_score = roc_auc_score(references, predictions, multi_class="ovr", average="macro")
    except ValueError:
        # Handle cases where AUC cannot be computed (e.g., single-class predictions)
        auc_score = np.nan

    # Plot the ROC curve
    plt.figure(figsize=(8, 6))
    if len(predictions.shape) > 1 and predictions.shape[1] > 1:
        # Multiclass ROC curve
        for i in range(predictions.shape[1]):
            fpr, tpr, _ = roc_curve(references == i, predictions[:, i])
            plt.plot(fpr, tpr, label=f"Class {i} (AUC = {roc_auc_score(references == i, predictions[:, i]):.2f})")
    else:
        # Binary ROC curve
        fpr, tpr, _ = roc_curve(references, predictions[:, 1])
        plt.plot(fpr, tpr, label=f"AUC = {auc_score:.2f}")

    # Configure plot
    plt.plot([0, 1], [0, 1], 'k--', label="Random Guess")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve")
    plt.legend(loc="lower right")
    plt.grid()
    plt.show()

    # Return all metrics
    return {**accuracy_score, **f1_score, "auc": auc_score}

In [None]:
# load the ViT model
modelViT = ViTForImageClassification.from_pretrained(
    model_name,
    num_labels=len(labels),
    id2label={str(i): c for i, c in enumerate(labels)},
    label2id={c: str(i) for i, c in enumerate(labels)},
    ignore_mismatched_sizes=True,
)

# defining the training arguments
training_args = TrainingArguments(
  output_dir="/kaggle/working/vit-base", # output directory
  per_device_train_batch_size=32, # batch size per device during training
  eval_strategy="steps",    # evaluation strategy to adopt during training
  num_train_epochs=20,             # total number of training epochs
  # fp16=True,                    # use mixed precision
  save_steps=80,                # number of update steps before saving checkpoint
  eval_steps=80,                # number of update steps before evaluating
  logging_steps=80,             # number of update steps before logging
  save_total_limit=2,             # limit the total amount of checkpoints on disk
  remove_unused_columns=False,    # remove unused columns from the dataset
  push_to_hub=False,              # do not push the model to the hub
  report_to='tensorboard',        # report metrics to tensorboard
  load_best_model_at_end=True,    # load the best model at the end of training
)

In [None]:
trainerViT = Trainer(
    model=modelViT,                        # the instantiated 🤗 Transformers model to be trained
    args=training_args,                 # training arguments, defined above
    data_collator=collate_fn,           # the data collator that will be used for batching
    compute_metrics=compute_metrics,    # the metrics function that will be used for evaluation
    train_dataset=dataset["train"],     # training dataset
    eval_dataset=dataset["validation"], # evaluation dataset
    tokenizer=image_processor,          # the processor that will be used for preprocessing the images
)

trainerViT.train()

torch.save(modelViT.state_dict(), '/kaggle/working/ViT_model.pth')

In [None]:
# ResNet
from transformers import AutoFeatureExtractor, ResNetForImageClassification, TrainingArguments, Trainer
from datasets import load_dataset
import numpy as np
from evaluate import load
import requests
import torch
from PIL import Image
from transformers import ViTImageProcessor, ViTForImageClassification, TrainingArguments, Trainer
from tqdm import tqdm
from evaluate import load
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import accelerate
import transformers

In [None]:
labels = ['healthy', 'parkinson']

label_map = {'healthy': 0, 'parkinson': 1}

def collate_fn(batch):
    # Split the batch into images (X) and labels (y)
    images = [item['image'] for item in batch]  # First element in each item is the image
    labels = [item['label'] for item in batch]  # Second element in each item is the label

    # Convert images to tensors
    images_tensor = torch.stack([torch.tensor(img, dtype=torch.float32).permute(2, 0, 1) for img in images])  # Convert images to tensors and change dimension order

    # Convert labels from strings ('healthy', 'parkinson') to integers (0, 1)
    labels_tensor = torch.tensor([label_map[label] for label in labels], dtype=torch.long)

    return {
        'pixel_values': images_tensor,  # Image tensor
        'labels': labels_tensor         # Label tensor (numerical)
    }

In [None]:
# Load the accuracy and f1 metrics from the evaluate module
accuracy = load("accuracy")
f1 = load("f1")

def compute_metrics(eval_pred):
    # Predictions and true labels
    predictions = eval_pred.predictions
    references = eval_pred.label_ids

    # Compute accuracy and F1 scores
    accuracy_score = accuracy.compute(predictions=np.argmax(predictions, axis=1), references=references)
    f1_score = f1.compute(predictions=np.argmax(predictions, axis=1), references=references, average="macro")

    # Compute AUC (for multiclass classification, use 'ovo' or 'ovr')
    try:
        auc_score = roc_auc_score(references, predictions, multi_class="ovr", average="macro")
    except ValueError:
        # Handle cases where AUC cannot be computed (e.g., single-class predictions)
        auc_score = np.nan

    # Plot the ROC curve
    plt.figure(figsize=(8, 6))
    if len(predictions.shape) > 1 and predictions.shape[1] > 1:
        # Multiclass ROC curve
        for i in range(predictions.shape[1]):
            fpr, tpr, _ = roc_curve(references == i, predictions[:, i])
            plt.plot(fpr, tpr, label=f"Class {i} (AUC = {roc_auc_score(references == i, predictions[:, i]):.2f})")
    else:
            # Binary ROC curve
        fpr, tpr, _ = roc_curve(references, predictions[:, 1])
        plt.plot(fpr, tpr, label=f"AUC = {auc_score:.2f}")

    # Configure plot
    plt.plot([0, 1], [0, 1], 'k--', label="Random Guess")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve")
    plt.legend(loc="lower right")
    plt.grid()
    plt.show()

    # Return all metrics
    return {**accuracy_score, **f1_score, "auc": auc_score}

In [None]:
modelResNet = ResNetForImageClassification.from_pretrained(
    'microsoft/resnet-50',
    num_labels=2,  # Binary classification
    ignore_mismatched_sizes=True
)

# training arguments
training_args = TrainingArguments(
    output_dir="./results", # output directory
  per_device_train_batch_size=32, # batch size per device during training
  eval_strategy="steps",    # evaluation strategy to adopt during training
  num_train_epochs=20,             # total number of training epochs
  save_steps=80,                # number of update steps before saving checkpoint
  eval_steps=80,                # number of update steps before evaluating
  logging_steps=80,             # number of update steps before logging
  save_total_limit=2,             # limit the total amount of checkpoints on disk
  remove_unused_columns=False,    # remove unused columns from the dataset
  push_to_hub=False,
report_to='tensorboard',        # report metrics to tensorboard
  load_best_model_at_end=True,
)

In [None]:
# Loading a pre-trained ResNet feature extractor
feature_extractor = AutoFeatureExtractor.from_pretrained('microsoft/resnet-50')

# Initializing the Trainer
trainerResNet = Trainer(
    model=modelResNet,
    args=training_args,
    data_collator=collate_fn,
    train_dataset=dataset["train"],
    eval_dataset=dataset["validation"],
    tokenizer=feature_extractor,  # Use the feature extractor here
    compute_metrics=compute_metrics
)

trainerResNet.train()

torch.save(modelResNet.state_dict(), '/kaggle/working/ResNet_model.pth')