In [None]:
pip install XlsxWriter

In [None]:
import os 
import random
import glob
import numpy as np
import time
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchaudio
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import pandas as pd

# ====================
# 1. Data Augmentations
# ====================

class AudioAugmentations:
    def __init__(self, sample_rate=16000, noise_factor=0.005, time_stretch_range=(0.8, 1.2), crop_size=16000):
        self.sample_rate = sample_rate
        self.noise_factor = noise_factor
        self.time_stretch_range = time_stretch_range
        self.crop_size = crop_size
        self.pitch_shift = torchaudio.transforms.PitchShift(sample_rate, n_steps=random.uniform(-2, 2))
        self.spec_augment = torchaudio.transforms.FrequencyMasking(freq_mask_param=15)
    
    def add_noise(self, waveform):
        noise = torch.randn_like(waveform) * self.noise_factor
        return waveform + noise

    def time_stretch(self, waveform):
        rate = random.uniform(*self.time_stretch_range)
        effects = [['speed', f'{rate}'], ['rate', f'{self.sample_rate}']]
        try:
            stretched, _ = torchaudio.sox_effects.apply_effects_tensor(waveform, self.sample_rate, effects)
            return stretched
        except OSError:
            return waveform

    def pitch_shift_fn(self, waveform):
        return self.pitch_shift(waveform)

    def apply_spec_augment(self, waveform):
        return self.spec_augment(waveform)

    def random_crop(self, waveform):
        if waveform.shape[1] > self.crop_size:
            max_start = waveform.shape[1] - self.crop_size
            start = random.randint(0, max_start)
            return waveform[:, start:start+self.crop_size]
        else:
            pad_amt = self.crop_size - waveform.shape[1]
            return F.pad(waveform, (0, pad_amt))

    def __call__(self, waveform):
        if random.random() < 0.5:
            waveform = self.add_noise(waveform)
        if random.random() < 0.5:
            waveform = self.time_stretch(waveform)
        if random.random() < 0.5:
            waveform = self.pitch_shift_fn(waveform)
        waveform = self.random_crop(waveform)
        if random.random() < 0.5:
            waveform = self.apply_spec_augment(waveform)
        return waveform

# ====================
# 2. Contrastive Audio Dataset
# ====================

class QMSAT(Dataset):  # Renamed dataset to QMSAT
    def __init__(self, root_dir, sample_rate=16000, transform=None):
        self.sample_rate = sample_rate
        self.file_paths = []
        self.labels = []
        self.transform = transform
        # Assumes dataset subdirectories: 'Quran-tilawat', 'Calm-Music', 'Normal-surrounding-voices'
        for sub_dir in ['Tilawat-e-QuranPak', 'Music', 'Normal(Silence)']:
            folder = Path(root_dir) / sub_dir
            wav_files = glob.glob(str(folder / '*.wav'))
            self.file_paths.extend(wav_files)
            self.labels.extend([sub_dir] * len(wav_files))
    
    def __len__(self):
        return len(self.file_paths)
    
    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        waveform, sr = torchaudio.load(file_path)
        if sr != self.sample_rate:
            waveform = torchaudio.transforms.Resample(sr, self.sample_rate)(waveform)
        if self.transform is not None:
            # Detach augmented outputs so gradients don't flow from DataLoader collation.
            aug1 = self.transform(waveform.clone()).detach()
            aug2 = self.transform(waveform.clone()).detach()
        else:
            aug1, aug2 = waveform, waveform
        label = self.labels[idx]
        return aug1, aug2, label

# ====================
# 3. QSMAT ATS Encoder
# ====================

class QSMATATSEncoder(nn.Module):  # Renamed encoder to QSMAT ATS Encoder
    def __init__(self, projection_dim=128):
        super(QSMATATSEncoder, self).__init__()
        self.mel_spec = MelSpectrogram(sample_rate=16000, n_mels=64)
        self.db_transform = AmplitudeToDB()
        # Load a pretrained ResNet18 (from torch.hub here)
        resnet = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
        # Modify first conv layer for single-channel input.
        resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.encoder = nn.Sequential(*list(resnet.children())[:-1])
        self.projection = nn.Linear(512, projection_dim)
    
    def forward(self, x):
        if x.dim() == 3:
            x = x.squeeze(1)
        mel = self.mel_spec(x)
        mel_db = self.db_transform(mel).unsqueeze(1)
        features = self.encoder(mel_db).squeeze(-1).squeeze(-1)
        projection = self.projection(features)
        return projection
    
    def encode(self, x):
        return self.forward(x)

# ====================
# 4. Training and Validation Functions for Self-Supervised Learning
# ====================

def train_self_supervised(model, train_loader, val_loader, optimizer, device, epochs=30):
    model.train()
    train_losses = []
    train_cosine = []  # average cosine similarity on training batches
    val_losses = []
    val_cosine = []    # average cosine similarity on validation batches
    epoch_times = []
    total_start_time = time.time()

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        running_cosine = 0.0
        batch_count = 0
        epoch_start = time.time()
        for aug1, aug2, _ in train_loader:
            aug1, aug2 = aug1.to(device), aug2.to(device)
            optimizer.zero_grad()
            proj1 = model(aug1)
            proj2 = model(aug2)
            loss = F.mse_loss(proj1, proj2)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            # Compute cosine similarity as additional metric.
            cos_sim = F.cosine_similarity(proj1, proj2, dim=1).mean().item()
            running_cosine += cos_sim
            batch_count += 1
        avg_train_loss = running_loss / batch_count
        avg_train_cosine = running_cosine / batch_count

        # Validation
        model.eval()
        val_running_loss = 0.0
        val_running_cosine = 0.0
        val_batch_count = 0
        with torch.no_grad():
            for aug1, aug2, _ in val_loader:
                aug1, aug2 = aug1.to(device), aug2.to(device)
                proj1 = model.encode(aug1)
                proj2 = model.encode(aug2)
                loss = F.mse_loss(proj1, proj2)
                val_running_loss += loss.item()
                cos_sim = F.cosine_similarity(proj1, proj2, dim=1).mean().item()
                val_running_cosine += cos_sim
                val_batch_count += 1
        avg_val_loss = val_running_loss / val_batch_count
        avg_val_cosine = val_running_cosine / val_batch_count

        epoch_time = time.time() - epoch_start
        epoch_times.append(epoch_time)
        train_losses.append(avg_train_loss)
        train_cosine.append(avg_train_cosine)
        val_losses.append(avg_val_loss)
        val_cosine.append(avg_val_cosine)
        
        print(f"[Epoch {epoch+1:02d}] Train Loss: {avg_train_loss:.4f} | Train Cosine: {avg_train_cosine:.4f} | "
              f"Val Loss: {avg_val_loss:.4f} | Val Cosine: {avg_val_cosine:.4f} | Time: {epoch_time:.2f} sec")
    
    total_training_time = time.time() - total_start_time
    print(f"Total Training Time: {total_training_time/60:.2f} minutes")
    return train_losses, train_cosine, val_losses, val_cosine, epoch_times, total_training_time


def plot_loss_curves(train_losses, val_losses):
    epochs = range(1, len(train_losses) + 1)
    plt.figure(figsize=(10,6))
    plt.plot(epochs, train_losses, 'o-', label='Train Loss', color='blue')
    plt.plot(epochs, val_losses, 's-', label='Validation Loss', color='red')
    plt.xlabel("Epoch")
    plt.ylabel("MSE Loss")
    plt.title("Self-Supervised Training Loss Curves")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

def plot_cosine_curves(train_cosine, val_cosine):
    epochs = range(1, len(train_cosine) + 1)
    plt.figure(figsize=(10,6))
    plt.plot(epochs, train_cosine, 'o-', label='Train Cosine Similarity', color='blue')
    plt.plot(epochs, val_cosine, 's-', label='Validation Cosine Similarity', color='red')
    plt.xlabel("Epoch")
    plt.ylabel("Cosine Similarity")
    plt.title("Cosine Similarity Curves")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

# ====================
# 5. Embeddings & Statistical Analysis Functions
# ====================

def extract_audio_labels(batch):
    # For analysis, use the first augmented view.
    aug1, aug2, labels = batch
    return aug1, labels

def compute_and_plot_statistics(model, dataloader, device, method='tsne'):
    """
    Computes class centroids from reduced embeddings, calculates inter-class distances,
    and plots a bar graph (for inter-class distances) along with a boxplot of intra-class
    distance distributions.
    """
    model.eval()
    all_embeddings = []
    all_labels = []
    with torch.no_grad():
        for batch in dataloader:
            audio, labels = extract_audio_labels(batch)
            audio = audio.to(device)
            embeddings = model.encode(audio)
            all_embeddings.append(embeddings.cpu().numpy())
            all_labels.extend(labels)
    all_embeddings = np.concatenate(all_embeddings, axis=0)
    
    # Dimensionality reduction
    if method == 'tsne':
        reducer = TSNE(n_components=2, perplexity=30, random_state=42)
    else:
        reducer = PCA(n_components=2)
    reduced_embeddings = reducer.fit_transform(all_embeddings)
    
    unique_labels = ['Tilawat-e-QuranPak', 'Music', 'Normal(Silence)']
    
    # Compute centroids and intra-class distances.
    centroids = {}
    intra_class_dists = {}
    for label in unique_labels:
        idx = [i for i, lab in enumerate(all_labels) if lab == label]
        emb_class = reduced_embeddings[idx]
        centroid = emb_class.mean(axis=0)
        centroids[label] = centroid
        dists = np.linalg.norm(emb_class - centroid, axis=1)
        intra_class_dists[label] = dists
    
    # Compute inter-class distances from Quran centroid.
    dist_quran_calm = np.linalg.norm(centroids["Tilawat-e-QuranPak"] - centroids["Music"])
    dist_quran_noise = np.linalg.norm(centroids["Tilawat-e-QuranPak"] - centroids["Normal(Silence)"])
    
    print("Inter-class Distances:")
    print(f"Distance between Quran-tilawat and Calm-Music: {dist_quran_calm:.2f}")
    print(f"Distance between Quran-tilawat and Normal-surrounding-voices: {dist_quran_noise:.2f}")
    
    for label in unique_labels:
        mean_dist = np.mean(intra_class_dists[label])
        std_dist = np.std(intra_class_dists[label])
        print(f"{label} - Mean intra-class distance: {mean_dist:.2f} (Std: {std_dist:.2f})")
    
    # Bar graph for inter-class distances.
    plt.figure(figsize=(6,4))
    bars = ["Quran vs Calm", "Quran vs Noise"]
    distances_bar = [dist_quran_calm, dist_quran_noise]
    plt.bar(bars, distances_bar, color=['red', 'red'])
    plt.ylabel("Euclidean Distance")
    plt.title("Inter-Class Distances")
    for i, v in enumerate(distances_bar):
        plt.text(i, v + 0.01, f"{v:.2f}", ha='center', fontweight='bold')
    plt.tight_layout()
    plt.show()
    
    # Boxplot for intra-class distance distributions.
    plt.figure(figsize=(8,6))
    data = [intra_class_dists[label] for label in unique_labels]
    plt.boxplot(data, labels=unique_labels)
    plt.ylabel("Distance to Centroid")
    plt.title("Intra-Class Distance Distributions")
    plt.tight_layout()
    plt.show()

In [None]:
# ====================
# 5. Main Function
# ====================

def main():
    # Set your dataset root directory (update as needed)
    root_dir = '/kaggle/input/qmsat-dataset/ATS-data'
    sample_rate = 16000
    batch_size = 128
    contrastive_epochs = 30
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Prepare dataset and augmentations.
    augmentations = AudioAugmentations(sample_rate=sample_rate, crop_size=sample_rate)
    dataset = QMSAT(root_dir=root_dir, sample_rate=sample_rate, transform=augmentations)  # Using renamed QMSAT dataset
    
    # Split dataset into training and validation sets (80/20 split).
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    print(f"Training samples: {len(train_dataset)} | Validation samples: {len(val_dataset)}")
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    
    # Initialize the encoder.
    encoder_model = QSMATATSEncoder(projection_dim=128).to(device)  # Using renamed encoder model
    print("Model Architecture Summary:")
    print(encoder_model)
    
    optimizer_encoder = optim.Adam(encoder_model.parameters(), lr=1e-3, weight_decay=1e-4)
    
    # Train the encoder with self-supervised learning.
    print("Starting Self-Supervised Training on", device)
    (train_losses, train_cosine, val_losses, val_cosine,
     epoch_times, total_training_time) = train_self_supervised(encoder_model, train_loader, val_loader, optimizer_encoder, device, epochs=contrastive_epochs)
    
    # Plot training curves.
    plot_loss_curves(train_losses, val_losses)
    plot_cosine_curves(train_cosine, val_cosine)
    
    # Save training metrics to an Excel file.
    excel_filename = "/kaggle/working/self_supervised_training_metrics.xlsx"
    df_metrics = pd.DataFrame({
        'Epoch': list(range(1, contrastive_epochs+1)),
        'Train_Loss': train_losses,
        'Train_Cosine': train_cosine,
        'Val_Loss': val_losses,
        'Val_Cosine': val_cosine,
        'Epoch_Time_sec': epoch_times
    })
    writer = pd.ExcelWriter(excel_filename, engine='xlsxwriter')
    df_metrics.to_excel(writer, sheet_name='Metrics', index=False)
    writer.close()
    print(f"Training metrics saved to {excel_filename}")
    
    # Save the trained encoder model.
    model_save_path = "/kaggle/working/trained_qsmatats_encoder.pth"  # Saving with new model name
    torch.save(encoder_model.state_dict(), model_save_path)
    print(f"Trained encoder model saved to {model_save_path}")
    print(f"Total Training Time: {total_training_time/60:.2f} minutes")
    
if __name__ == "__main__":
    main()