In [None]:
from IPython.core.display import display, HTML

display(HTML("<style>.container { width:100% !important; }</style>"))


  from IPython.core.display import display, HTML


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class KANLinear(nn.Module):
    def __init__(self, in_features, out_features, wavelet_type='haar'):
        super(KANLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.wavelet_type = wavelet_type

        # Parameters for wavelet transformation
        self.scale = nn.Parameter(torch.ones(out_features, in_features))
        self.translation = nn.Parameter(torch.zeros(out_features, in_features))
        self.wavelet_weights = nn.Parameter(torch.Tensor(out_features, in_features))
        self.weight1 = nn.Parameter(torch.Tensor(out_features, in_features))

        nn.init.kaiming_uniform_(self.wavelet_weights, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.weight1, a=math.sqrt(5))

        # Batch normalization
        self.bn = nn.BatchNorm1d(out_features)

    def wavelet_transform(self, x):
        x_expanded = x.unsqueeze(1) if x.dim() == 2 else x
        translation_expanded = self.translation.unsqueeze(0).expand(x.size(0), -1, -1)
        scale_expanded = self.scale.unsqueeze(0).expand(x.size(0), -1, -1)
        x_scaled = (x_expanded - translation_expanded) / scale_expanded

        if self.wavelet_type == 'haar':
            wavelet = self.haar_wavelet(x_scaled)
        elif self.wavelet_type == 'coiflet':
            wavelet = self.coiflet_wavelet(x_scaled)
        elif self.wavelet_type == 'biorthogonal':
            wavelet = self.biorthogonal_wavelet(x_scaled)
        elif self.wavelet_type == 'daubechies':
            wavelet = self.daubechies_wavelet(x_scaled)
        elif self.wavelet_type == 'mexican_hat':
            term1 = ((x_scaled ** 2)-1)
            term2 = torch.exp(-0.5 * x_scaled ** 2)
            wavelet = (2 / (math.sqrt(3) * math.pi**0.25)) * term1 * term2
        elif self.wavelet_type == 'morlet':
            omega0 = 5.0  # Central frequency
            real = torch.cos(omega0 * x_scaled)
            envelope = torch.exp(-0.5 * x_scaled ** 2)
            wavelet = envelope * real
        elif self.wavelet_type == 'dog':
            dog = -x_scaled * torch.exp(-0.5 * x_scaled ** 2)
            wavelet = dog
        elif self.wavelet_type == 'meyer':
            pi = math.pi
            v = torch.abs(x_scaled)
            wavelet = torch.sin(pi * v) * self.meyer_aux(v)
        elif self.wavelet_type == 'symlet4':
            wavelet = self.symlet4_wavelet(x_scaled)
        elif self.wavelet_type == 'shannon':
            pi = math.pi
            sinc = torch.sinc(x_scaled / pi)
            window = torch.hamming_window(x_scaled.size(-1), periodic=False, dtype=x_scaled.dtype, device=x_scaled.device)
            wavelet = sinc * window
        else:
            raise ValueError("Unsupported wavelet type")

        wavelet_weighted = wavelet * self.wavelet_weights.unsqueeze(0).expand_as(wavelet)
        wavelet_output = wavelet_weighted.sum(dim=2)
        return wavelet_output

    def haar_wavelet(self, x):
        return torch.where(x < 0.5, 1., torch.where(x < 1., -1., 0.))

    def coiflet_wavelet(self, x):
        c = [0.038580, -0.126969, -0.077974, 0.417845, 0.812866, 0.468429]
        return sum(c[i] * x**i for i in range(len(c)))

    def biorthogonal_wavelet(self, x):
        return 3*x**2 - 2*x**3

    def symlet4_wavelet(self, x):
        # Symlet 4 (sym4) wavelet coefficients
        coeffs = [
            -0.075765714789273,
            0.029635527645999,
            0.497618667632015,
            0.803738751805916,
            0.297857795605542,
            -0.099219543576847,
            -0.012603967262038,
            0.032223100604042
        ]

        # Convolve the input with the wavelet coefficients
        return sum(coeffs[i] * x**i for i in range(len(coeffs)))

    def daubechies_wavelet(self, x):
        sqrt3 = math.sqrt(3)
        coeff1 = (1 + sqrt3) / (4 * math.sqrt(2))
        coeff2 = (3 + sqrt3) / (4 * math.sqrt(2))
        coeff3 = (3 - sqrt3) / (4 * math.sqrt(2))
        coeff4 = (1 - sqrt3) / (4 * math.sqrt(2))
        return coeff1 - coeff2 * x + coeff3 * x**2 - coeff4 * x**3

    def meyer_aux(self, v):
        pi = math.pi
        return torch.where(v <= 1/2, torch.ones_like(v), torch.where(v >= 1, torch.zeros_like(v), torch.cos(pi / 2 * self.nu(2 * v - 1))))

    def nu(self, t):
        return t**4 * (35 - 84*t + 70*t**2 - 20*t**3)

    def forward(self, x):
        wavelet_output = self.wavelet_transform(x)
        base_output = F.linear(x, self.weight1)
        combined_output = wavelet_output + base_output
        return self.bn(combined_output)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, RobustScaler, MaxAbsScaler, PowerTransformer, MinMaxScaler

def load_and_preprocess_data(path):
    # Load the CSV file
    df = pd.read_csv(path)

    # Encode the string labels to numerical values
    string_labels = df['label']
    label_encoder = LabelEncoder()
    labels = label_encoder.fit_transform(string_labels)

    # Select all columns except the label for numerical processing
    df_numerical = df.drop(columns=['label'])

    # Apply MinMaxScaler
    scaler = PowerTransformer()
    df_numerical_scaled = scaler.fit_transform(df_numerical)

    return df_numerical_scaled, labels

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import LabelEncoder, PowerTransformer, MinMaxScaler
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm

# Create pairs of augmented data
def create_pairs(data):
    augmented_original_pairs = []
    for row in data:
        augmented_row1 = augment_data(row)
        augmented_row2 = augment_data(row)
        augmented_original_pairs.append((augmented_row1, augmented_row2, row))
    return np.array(augmented_original_pairs)

# Define an augmentation function
def augment_data(data, noise_level=0.1):
    noise = np.random.normal(0, noise_level, data.shape)
    return data + noise

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

# Dataset class
class TrainDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data['pos_1'])

    def __getitem__(self, idx):
        pos_1 = self.data['pos_1'][idx]
        pos_2 = self.data['pos_2'][idx]
        return torch.tensor(pos_1, dtype=torch.float), torch.tensor(pos_2, dtype=torch.float)

class MemoryDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data['true_data'])

    def __getitem__(self, idx):
        true_data = self.data['true_data'][idx]
        target = self.data['target'][idx]
        return torch.tensor(true_data, dtype=torch.float), torch.tensor(target, dtype=torch.long)

class TestDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data['true_data'])

    def __getitem__(self, idx):
        raw_data = self.data['true_data'][idx]
        target = self.data['target'][idx]
        return torch.tensor(raw_data, dtype=torch.float), torch.tensor(target, dtype=torch.long)


class NumericalModel(nn.Module):
    def __init__(self, input_dim, feature_dim=128, wavelet_type='haar'):
        super(NumericalModel, self).__init__()

        self.channels = 1  # Number of channels in the input
        self.length = input_dim  # Length of the sequence

        # Adjusted feature extractor for 1D data with Conv1D and MaxPool1D
        self.feature_extractor = nn.Sequential(
            nn.Conv1d(self.channels, 256, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(256, 256, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Flatten(),
            nn.Linear(256 * (self.length // 4), 512),
            nn.ReLU()
        )

        # Updated projection head with wavelet-based KANLinear
        self.projection_head = nn.Sequential(
            KANLinear(512, 256, wavelet_type=wavelet_type),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            KANLinear(256, feature_dim, wavelet_type=wavelet_type)
        )

    def forward(self, x):
        # Reshape input to (batch size, channels, length)
        x = x.view(-1, self.channels, self.length)
        feature = self.feature_extractor(x)
        out = self.projection_head(feature)
        return F.normalize(feature, dim=-1), F.normalize(out, dim=-1)


In [None]:
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
from geomloss import SamplesLoss
np.random.seed(0)

# Function to create a mask for negative samples
def get_negative_mask(batch_size):
    negative_mask = torch.ones((batch_size, 2 * batch_size), dtype=bool)
    for i in range(batch_size):
        negative_mask[i, i] = 0
        negative_mask[i, i + batch_size] = 0
    negative_mask = torch.cat((negative_mask, negative_mask), 0)
    return negative_mask

# Training function
def train_drop_sub(net, data_loader, train_optimizer, device, epoch, epochs,batch_size, args):
    net.train()
    total_loss, total_num = 0.0,0
    train_bar = tqdm(data_loader)

    for pos_1, pos_2 in train_bar:
        pos_1, pos_2 = pos_1.to(device), pos_2.to(device)
        feature_1, out_1 = net(pos_1)
        feature_2, out_2 = net(pos_2)

        # Create similarity matrix
        sim_matrix = torch.mm(feature_1, feature_2.t())  # Example using dot product for similarity

        # Apply mixup alpha
        alpha = args.drop_alpha

        # False Negatives Elimination: Remove top alpha dissimilar entries from sim_matrix
        num_to_remove = int(alpha * sim_matrix.numel())

        # neg score
        out = torch.cat([out_1, out_2], dim=0)
        actual_batch_size = pos_1.size(0)
        neg = torch.exp(torch.mm(out, out.t().contiguous()) )
        old_neg = neg.clone()
        mask = get_negative_mask(actual_batch_size).to(device)
        neg = neg.masked_select(mask).view(2 * actual_batch_size, -1)

        #  Similarity-Based Weighting: Remove top alpha similar entries from neg
        similar_values, _ = torch.topk(neg.view(-1), num_to_remove, largest=True)
        threshold_similar = similar_values[-1]
        neg[neg >= threshold_similar] = 0

        # pos score
        pos = torch.exp(torch.sum(out_1 * out_2, dim=-1))
        pos = torch.cat([pos, pos], dim=0)
        Ng = neg.sum(dim=-1)

        # loss_wasserstein distance
        loss_wasserstein = SamplesLoss(loss="sinkhorn", p=2)
        pos_reshaped = pos.unsqueeze(1)
        Ng_reshaped = Ng.unsqueeze(1)
        loss_distances = loss_wasserstein(pos_reshaped, Ng_reshaped)

        # contrastive loss
        loss = 0.9*torch.log(loss_distances) - 0.1*(-torch.log(pos / (pos + Ng))).mean()

        train_optimizer.zero_grad()
        loss.backward()
        train_optimizer.step()

        total_num += actual_batch_size
        total_loss += loss.item() * batch_size
        train_bar.set_description(f'Train Epoch: [{epoch}/{epochs}] Loss: {total_loss / total_num:.4f}')

    return total_loss / total_num


In [None]:
import torch
from tqdm import tqdm
from sklearn.metrics import multilabel_confusion_matrix
import numpy as np

def d_index_score_cal(y_true, y_pred):
    if len(y_true) == 0 or len(y_pred) == 0:
        return 0

    # Ensure y_pred is not a list of lists
    if all(isinstance(pred, list) for pred in y_pred):
        y_pred = [1 if true_label in pred else 0 for true_label, pred in zip(y_true, y_pred)]
    else:
        # If y_pred is already flat, ensure it's the same length as y_true
        assert len(y_true) == len(y_pred), "y_true and y_pred must have the same length"

    conf_matrix = multilabel_confusion_matrix(y_true, y_pred)
    d_indices = []
    for i in range(len(conf_matrix)):
        tn, fp, fn, tp = conf_matrix[i].ravel()
        sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 1e-7
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 1e-7
        accuracy = (tp + tn) / (tp + tn + fp + fn)
        d = np.log2(1 + accuracy) + np.log2(1 + (sensitivity + specificity) / 2)
        d_indices.append(d)
    avg_d_index = sum(d_indices) / len(d_indices) if len(d_indices) > 0 else 0
    return avg_d_index


def create_exact_match_list(top2_predictions, targets):
    exact_matches = []
    for pred_pair, target in zip(top2_predictions, targets):
        # Find the matching prediction, append -1 if no match is found
        match = next((pred for pred in pred_pair if pred == target), -1)
        exact_matches.append(match)
    return exact_matches


In [None]:
import torch
from tqdm import tqdm
from sklearn.metrics import classification_report


def flatten_predictions(predictions, true_labels):
    TP, FP, FN = 0, 0, 0
    for i, preds in enumerate(predictions):
        true_label = true_labels[i]

        # Count true positives, false positives, false negatives
        if true_label in preds:
            TP += 1
        else:
            FN += 1
        FP += len([p for p in preds if p != true_label])

    return  TP, FP, FN

def test_knn(net, memory_data_loader, test_data_loader, k, c):
    net.eval()
    total_accuracy, total_num = 0.0, 0
    original_labels_list = []
    predictions = []
    feature_bank = []

    with torch.no_grad():
        # Generate feature bank
        for data, _ in tqdm(memory_data_loader, desc='Feature extracting'):
            feature = net(data.cuda(non_blocking=True))[0]
            feature_bank.append(feature)
        feature_bank = torch.cat(feature_bank, dim=0).t().contiguous()

        feature_labels = torch.tensor([label for _, label in memory_data_loader.dataset], device=feature_bank.device)

        test_bar = tqdm(test_data_loader)
        for data, target in test_bar:
            data, target = data.cuda(non_blocking=True), target.cuda(non_blocking=True)
            feature = net(data)[0]

            total_num += data.size(0)
            sim_matrix = torch.mm(feature, feature_bank)
            sim_weight, sim_indices = sim_matrix.topk(k=k, dim=-1)
            sim_labels = torch.gather(feature_labels.expand(data.size(0), -1), dim=-1, index=sim_indices)
            sim_weight = (sim_weight).exp()

            one_hot_label = torch.zeros(data.size(0) * k, c, device=sim_labels.device)
            one_hot_label = one_hot_label.scatter(dim=-1, index=sim_labels.view(-1, 1).long(), value=1.0)
            pred_scores = torch.sum(one_hot_label.view(data.size(0), -1, c) * sim_weight.unsqueeze(dim=-1), dim=1)
            pred_labels = pred_scores.argsort(dim=-1, descending=True)

            for i in range(data.size(0)):
                original_labels_list.append(target[i].item())
                predictions.append(pred_labels[i, :3].tolist())

            total_accuracy += torch.sum((pred_labels[:, :3] == target.unsqueeze(dim=-1)).any(dim=-1).float()).item()

    TP, FP, FN = flatten_predictions(predictions, original_labels_list)


    precision, recall, f1 = calculate_metrics(TP, FP, FN)

    # Flatten predictions for d-index calculation
    flat_predictions = create_exact_match_list(predictions, original_labels_list)

    # Calculate d-index
    d_index = d_index_score_cal(original_labels_list, flat_predictions)

    return {
        'accuracy': total_accuracy / total_num * 100,
        'd_index': d_index,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }


def calculate_metrics(TP, FP, FN):
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    return precision, recall, f1



In [None]:
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader

class Args:
    def __init__(self, drop_alpha):
        self.drop_alpha = drop_alpha

def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)

    epochs = 50
    k = 10

    path = 'esc50_audio_features.csv'

    df_numerical, labels = load_and_preprocess_data(path)
    num_classes = len(np.unique(labels))
    augmented_original_pairs = create_pairs(df_numerical)

    learning_rates = [1e-4]
    batch_sizes = [64]

    drop_alphas = [0.05, 0.1, 0.15, 0.2, 0.25]

    for drop_alpha in drop_alphas:
        args = Args(drop_alpha)
        print(f"Running for drop_alpha: {args.drop_alpha}")

        all_metrics = []  # Initialize the list to store metrics for the current drop_alpha

        for lr in learning_rates:
            for batch_size in batch_sizes:
                print(f"Learning Rate: {lr}, Batch Size: {batch_size}")

                train_data = {"pos_1": augmented_original_pairs[:, 0], "pos_2": augmented_original_pairs[:, 1]}
                train_dataset = TrainDataset(train_data)
                train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

                memory_data = {"true_data": augmented_original_pairs[:, 2], 'target': labels}
                memory_dataset = MemoryDataset(memory_data)
                memory_data_loader = DataLoader(memory_dataset, batch_size=batch_size, shuffle=False)

                test_data = {"true_data": augmented_original_pairs[:, 2], 'target': labels}
                test_dataset = TestDataset(test_data)
                test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

                model = NumericalModel(input_dim=df_numerical.shape[1], feature_dim=64).to(device)
                model = nn.DataParallel(model)
                optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-6)

                for epoch in range(epochs):
                    train_loss = train_drop_sub(model, train_loader, optimizer, device, epoch, epochs, batch_size, args)
                    test_results = test_knn(model, memory_data_loader, test_loader, k, num_classes)
                    # Collect metrics
                    epoch_metrics = {
                        'Epoch': epoch + 1,
                        'Learning Rate': lr,
                        'Batch Size': batch_size,
                        'Drop Alpha': args.drop_alpha,
                        'Accuracy': test_results['accuracy'],
                        'D-Index': test_results['d_index'],
                        'Precision': test_results['precision'],
                        'Recall': test_results['recall'],
                        'F1': test_results['f1'],
                    }

                    all_metrics.append(epoch_metrics)

                    # Unpack the results
                    metrics = test_results

                    # Access individual metrics
                    accuracy = metrics['accuracy']
                    d_index = metrics['d_index']
                    precision = metrics['precision']
                    recall = metrics['recall']
                    f1 = metrics['f1']

                    # Print Metrics
                    print("Metrics:")
                    print(f"  Accuracy: {accuracy:.8f}%")
                    print(f"  D-Index: {d_index:.8f}")
                    print(f"  Precision: {precision:.8f}")
                    print(f"  Recall: {recall:.8f}")
                    print(f"  F1 Score: {f1:.8f}")

                # Convert the list of metrics to a DataFrame and save it as a CSV file specific to the current drop_alpha
                df_metrics = pd.DataFrame(all_metrics)
                filename = f'91_48cols_model_metrics_drop_alpha_{args.drop_alpha}_{lr}_{batch_size}_PowerTransformer.csv'
                df_metrics.to_csv(filename, index=False)
                print(f"Model metrics for drop_alpha {args.drop_alpha} saved!")


if __name__ == "__main__":
    main()


Using device: cuda
Running for drop_alpha: 0.05
Learning Rate: 0.0001, Batch Size: 64
