In [None]:
import os
import numpy as np
import torch
import random
from torch.utils.data import Dataset, DataLoader, Subset
import librosa
from sklearn.model_selection import train_test_split

# Hyperparameter
DATA_DIR      = 'speech_commands'
BATCH_SIZE    = 64

LEARNING_RATE = 0.0005
NUM_CLASSES   = 10
SAMPLE_RATE   = 16000
DURATION      = 1
N_MFCC        = 40
COMMANDS      = ['yes','no','up','down','left','right','on','off','stop','go']

trainepochs = 300
fineepochs = 150

# Fix random seeds for reproducibility
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

# 1) Load and extract MFCC features
def load_audio_files(parent_dir, sub_dirs, target_length=130):
    features, labels = [], []
    for label, sub in enumerate(sub_dirs):
        folder = os.path.join(parent_dir, sub)
        for fn in os.listdir(folder):
            if not fn.endswith('.wav'): continue
            path = os.path.join(folder, fn)
            audio, _ = librosa.load(path, sr=SAMPLE_RATE, duration=DURATION)
            mfccs = librosa.feature.mfcc(y=audio, sr=SAMPLE_RATE, n_mfcc=N_MFCC)
            # Pad/truncate to fixed length
            if mfccs.shape[1] < target_length:
                pad = target_length - mfccs.shape[1]
                mfccs = np.pad(mfccs, ((0,0),(0,pad)), 'constant')
            else:
                mfccs = mfccs[:, :target_length]
            features.append(mfccs)
            labels.append(label)
    return np.array(features), np.array(labels)

features, labels = load_audio_files(DATA_DIR, COMMANDS)
print("Loaded features:", features.shape, "labels:", labels.shape)

# 2) Stratified train/test split
X_train, X_test, y_train, y_test = train_test_split(
    features, labels, test_size=0.2, stratify=labels, random_state=42
)

# 3) Define base Dataset class
class AudioDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels   = labels
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        x = torch.FloatTensor(self.features[idx])    # [N_MFCC, T]
        y = torch.LongTensor([self.labels[idx]])     # [1]
        return x, y

trainset = AudioDataset(X_train, y_train)
testset  = AudioDataset(X_test, y_test)

# 4) Randomly select 50% samples as "attacked" samples
num_train_attack     = len(trainset) // 2
attack_train_indices = set(random.sample(range(len(trainset)), num_train_attack))
train_clean_indices  = set(range(len(trainset))) - attack_train_indices

print(f"Train Clean Samples: {len(train_clean_indices)}, Attacked Samples: {len(attack_train_indices)}")

# 5) Gaussian noise function & NoisyDataset
def apply_audio_noise(mfcc_tensor, noise_level=3):
    noise = torch.randn_like(mfcc_tensor) * noise_level
    return mfcc_tensor + noise

class NoisyAudioDataset(Dataset):
    def __init__(self, dataset, attacked_indices, noise_level=0.01):
        self.dataset        = dataset
        self.attacked_indices = attacked_indices
        self.noise_level    = noise_level

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        mfcc, label = self.dataset[idx]
        if idx in self.attacked_indices:
            mfcc = apply_audio_noise(mfcc, self.noise_level)
        return mfcc, label

# 6) Create noisy training set and DataLoaders
trainset_noisy    = NoisyAudioDataset(trainset, attack_train_indices, noise_level=15)
trainloader       = DataLoader(trainset_noisy, batch_size=BATCH_SIZE, shuffle=False)
testloader        = DataLoader(testset,        batch_size=BATCH_SIZE, shuffle=False)

# 7) Create subsets for attacked and clean samples
train_attack_subset = Subset(trainset_noisy, list(attack_train_indices))
train_clean_subset  = Subset(trainset_noisy, list(train_clean_indices))

attackTrainloader   = DataLoader(train_attack_subset, batch_size=BATCH_SIZE, shuffle=False)
cleanTrainloader    = DataLoader(train_clean_subset,  batch_size=BATCH_SIZE, shuffle=False)

print(f"attackTrain samples: {len(train_attack_subset)}")
print(f"cleanTrain samples : {len(train_clean_subset)}")

In [None]:
import os
import numpy as np
import torch
import time
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import librosa
import matplotlib.pyplot as plt

from torchsummary import summary


class ImprovedAudioCNN(nn.Module):
    def __init__(self, num_classes):
        super(ImprovedAudioCNN, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(3, 3), padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.adaptive_pool = nn.AdaptiveAvgPool2d((2, 2))

        self.fc = nn.Sequential(
            nn.Linear(64 * 2 * 2, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.conv1(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# Main execution
if __name__ == "__main__":
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    model = ImprovedAudioCNN(NUM_CLASSES).to(device)
    summary(model, (N_MFCC, features[0].shape[1]))  # Input shape: (N_MFCC, time_steps)

    # Optimizer with learning rate scheduling
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=5, factor=0.5, verbose=True)

    best_test_acc = 0
    train_losses = []
    train_accs = []
    test_accs = []
    epoch_times = []  # New: for storing each epoch's duration

    for epoch in range(trainepochs):
        start_time = time.time()  # Record start time
        model.train()
        train_loss, train_correct = 0, 0

        for inputs, labels in trainloader:
            inputs, labels = inputs.to(device), labels.squeeze().to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_correct += (predicted == labels).sum().item()

        # Evaluation on test set
        model.eval()
        test_correct = 0
        test_total = 0
        with torch.no_grad():
            for inputs, labels in testloader:
                inputs, labels = inputs.to(device), labels.squeeze().to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                test_correct += (predicted == labels).sum().item()
                test_total += labels.size(0)

        end_time = time.time()  # Record end time
        epoch_time = end_time - start_time  # Calculate epoch duration
        epoch_times.append(epoch_time)  # Store epoch time

        train_loss_avg = train_loss / len(trainloader)
        train_acc = train_correct / len(trainset)
        test_acc = test_correct / test_total

        train_losses.append(train_loss_avg)
        train_accs.append(train_acc)
        test_accs.append(test_acc)

        # Adjust learning rate
        scheduler.step(test_acc)

        print(f'\nEpoch {epoch+1}/{trainepochs}:')
        print(f'Train Loss: {train_loss_avg:.4f} | Train Acc: {train_acc:.4f}')
        print(f'Test Acc : {test_acc:.4f}')
        print(f'Epoch Time: {epoch_time:.2f} seconds')  # Print epoch duration

        if test_acc > best_test_acc:
            best_test_acc = test_acc
            torch.save(model.state_dict(), 'best_model.pth')
            print('Best model saved!')

    # Calculate average epoch time
    avg_epoch_time = sum(epoch_times) / len(epoch_times)

    # Plotting
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.title('Training Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Train Acc')
    plt.plot(test_accs, label='Test Acc')
    plt.title('Accuracy')
    plt.legend()
    plt.tight_layout()
    plt.savefig('training_metrics.png')
    plt.close()

    # Output statistics
    print(f'\nTraining completed!')
    print(f'Total training time: {sum(epoch_times):.2f} seconds')
    print(f'Average epoch time: {avg_epoch_time:.2f} seconds')
    print(f'Best Test Accuracy: {best_test_acc:.4f}')
    torch.save(model, 'final_model.pth')
    print('Final model saved!')

In [None]:
import torch
import torch.nn.functional as F

def compute_ffn_attribution(model, data_loader, device):
    """
    Compute attribution for the first fully-connected layer (Linear(64*2*2, 128)) in ImprovedAudioCNN

    Args:
        model: Trained ImprovedAudioCNN model
        data_loader: Audio data loader returning (mfcc, label) tuples
        device: 'cuda' or 'cpu'
    Returns:
        attributions: Attribution matrix of shape (num_samples, 128)
    """
    model.eval()
    attributions = []

    for mfccs, _ in data_loader:
        mfccs = mfccs.to(device)
        mfccs.requires_grad_()

        # ===== Manual forward pass up to fc1 =====
        x = mfccs.unsqueeze(1)            # (B,1,N_MFCC,T)
        x = model.conv1(x)                # (B,64,*,*)
        x = model.adaptive_pool(x)        # (B,64,2,2)
        x = x.view(x.size(0), -1)         # (B, 64*2*2)

        # Save input to first fully-connected layer (fc1)
        fc_input = x.clone()

        # First FC layer + activation
        z1 = model.fc[0](x)               # (B,128)
        a1 = F.relu(z1)                   # (B,128)

        # Second FC layer to get final logits
        logits = model.fc[2](a1)          # (B, num_classes)

        # For each class, compute gradient of logits[:, i] w.r.t z1
        grad_list = []
        for i in range(logits.size(1)):
            model.zero_grad()
            grads = torch.autograd.grad(
                outputs=logits[:, i],
                inputs=z1,
                grad_outputs=torch.ones_like(logits[:, i]),
                retain_graph=True,
                create_graph=False
            )[0]                         # (B,128)
            grad_list.append(grads)

        # Stack into (B, num_classes, 128)
        grads_all = torch.stack(grad_list, dim=1)

        # Take max response across classes, multiply by activation a1, and take absolute value
        # Result shape: (B,128)
        attr = (grads_all.max(dim=1)[0] * a1).abs()

        attributions.append(attr.detach().cpu())

    return torch.cat(attributions, dim=0)  # (total_samples, 128)

In [None]:
# Initialize model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Automatically select device
model.to(device)  # Move model to GPU
# Compute attribution
attributions = compute_ffn_attribution(model, trainloader,device)

# attributions shape is (num_samples, 256), representing each sample's importance on 256-dimensional features

In [None]:
attributions.shape

In [None]:
from sklearn.preprocessing import StandardScaler

# Assume ffn_attributions is a PyTorch tensor on GPU
# 1. Move tensor from GPU to CPU
ffn_attributions_cpu = attributions.cpu()

# 2. Convert PyTorch tensor to NumPy array
ffn_attributions_np = ffn_attributions_cpu.numpy()

# 3. Standardize using StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(ffn_attributions_np)

from sklearn.mixture import GaussianMixture

# GMM clustering
gmm = GaussianMixture(n_components=2, random_state=42)
gmm_labels = gmm.fit_predict(X_scaled)

# Get indices for two clusters
cluster_0_indices = np.where(gmm_labels == 0)[0]
cluster_1_indices = np.where(gmm_labels == 1)[0]

print(f"\nGMM分类结果:")
print(f"Cluster 0 样本数: {len(cluster_0_indices)}")
print(f"Cluster 1 样本数: {len(cluster_1_indices)}")

In [None]:
# Calculate the matching between K-Means clustering and manual partitioning
attack_set = set(attack_train_indices)  # True attack sample indices
clean_set = set(train_clean_indices)    # True clean sample indices

cluster_0_set = set(cluster_0_indices)  # Samples in K-Means cluster 0
cluster_1_set = set(cluster_1_indices)  # Samples in K-Means cluster 1

# Calculate intersections
attack_in_cluster_0 = len(attack_set & cluster_0_set)  # Number of true attack samples in Cluster 0
attack_in_cluster_1 = len(attack_set & cluster_1_set)  # Number of true attack samples in Cluster 1

clean_in_cluster_0 = len(clean_set & cluster_0_set)  # Number of true clean samples in Cluster 0
clean_in_cluster_1 = len(clean_set & cluster_1_set)  # Number of true clean samples in Cluster 1

# Calculate classification accuracy of K-Means for attack samples
attack_accuracy_cluster_0 = (attack_in_cluster_0 / len(attack_set)) * 100
attack_accuracy_cluster_1 = (attack_in_cluster_1 / len(attack_set)) * 100

clean_accuracy_cluster_0 = (clean_in_cluster_0 / len(clean_set)) * 100
clean_accuracy_cluster_1 = (clean_in_cluster_1 / len(clean_set)) * 100

# Print comparison results
print("==== K-Means Clustering vs Manual Labels ====")
print(f"Cluster 0: {len(cluster_0_indices)} samples")
print(f"Cluster 1: {len(cluster_1_indices)} samples\n")

print(f"Attack Samples in Cluster 0: {attack_in_cluster_0} ({attack_accuracy_cluster_0:.2f}%)")
print(f"Attack Samples in Cluster 1: {attack_in_cluster_1} ({attack_accuracy_cluster_1:.2f}%)\n")

print(f"Clean Samples in Cluster 0: {clean_in_cluster_0} ({clean_accuracy_cluster_0:.2f}%)")
print(f"Clean Samples in Cluster 1: {clean_in_cluster_1} ({clean_accuracy_cluster_1:.2f}%)")

# Calculate overall classification accuracy of K-Means
total_correct = attack_in_cluster_0 + clean_in_cluster_1  # Assuming cluster_0 mainly contains attack samples, cluster_1 mainly clean samples
overall_accuracy = (total_correct / len(trainset)) * 100  # Previously train_subset, now changed to trainset

print(f"\nOverall Clustering Accuracy: {overall_accuracy:.2f}%")

In [None]:
# Extract samples from cluster 0
cluster_0_attributions = attributions[cluster_0_indices]

# Extract samples from cluster 1
cluster_1_attributions = attributions[cluster_1_indices]
print(cluster_0_attributions.shape)
print(cluster_1_attributions.shape)

In [None]:
import torch
from torch.utils.data import DataLoader, Subset

# Ensure labels length matches dataset size gam_labels
# assert len(labels) == len(trainloader.dataset), "labels and trainloader.dataset size mismatch!"

# Convert indices to Python list
cluster_0_indices = cluster_0_indices.tolist()
cluster_1_indices = cluster_1_indices.tolist()

# Use Subset to split original dataset by indices
dataset_0 = Subset(trainloader.dataset, cluster_0_indices)
dataset_1 = Subset(trainloader.dataset, cluster_1_indices)

# Create new DataLoaders
trainloader_0 = DataLoader(dataset_0, batch_size=64, shuffle=True)
trainloader_1 = DataLoader(dataset_1, batch_size=64, shuffle=True)

# Final verification
print(f"Expected Cluster 0 size: {len(cluster_0_indices)}, Actual: {len(trainloader_0.dataset)}")
print(f"Expected Cluster 1 size: {len(cluster_1_indices)}, Actual: {len(trainloader_1.dataset)}")

if overall_accuracy > 0.5:
    fine_tuning_load = trainloader_1
else:
    fine_tuning_load = trainloader_0


In [None]:
import torch

def evaluate_model(model, dataloader, device):
    """
    Calculate the model's accuracy on a given dataset.

    Args:
      - model: Trained PyTorch model
      - dataloader: DataLoader for evaluation
      - device: Computation device ('cuda' or 'cpu')

    Returns:
      - accuracy: Accuracy score (float)
    """
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            # Flatten labels to shape (B,)
            labels = labels.to(device).view(-1)

            outputs = model(inputs)
            _, predicted = torch.max(outputs, dim=1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return 100.0 * correct / total

import torch
from sklearn.metrics import precision_score, f1_score
import numpy as np

def evaluate_model_with_metric(model, dataloader, device):
    """
    Evaluate model performance on a given dataset, including Accuracy, Precision, F1 and Top-3 Accuracy.

    Args:
      - model: Trained PyTorch model
      - dataloader: Test DataLoader
      - device: 'cuda' or 'cpu'

    Returns:
      - results: Dictionary containing accuracy, precision, f1, top3_accuracy
    """
    model.eval()
    all_preds = []
    all_labels = []
    top3_correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device).view(-1)

            outputs = model(inputs)

            # Top-1 predictions
            _, predicted = torch.max(outputs, dim=1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            # Top-3 predictions
            top3 = torch.topk(outputs, k=3, dim=1).indices  # shape: [B, 3]
            top3_correct += sum([labels[i].item() in top3[i] for i in range(len(labels))])
            total += labels.size(0)

    accuracy = 100.0 * np.mean(np.array(all_preds) == np.array(all_labels))
    precision = precision_score(all_labels, all_preds, average='macro', zero_division=0) * 100
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0) * 100
    top3_accuracy = 100.0 * top3_correct / total

    results = {
        'Accuracy (%)': accuracy,
        'Precision (macro, %)' : precision,
        'F1-score (macro, %)' : f1,
        'Top-3 Accuracy (%)': top3_accuracy
    }

    return results



In [None]:
# Evaluate model on cluster_0 and cluster_1 datasets
accuracy_cluster_0 = evaluate_model(model, trainloader_0, device)
accuracy_cluster_1 = evaluate_model(model, trainloader_1, device)
accuracy_cluster = evaluate_model(model,trainloader,device)

print(f"Accuracy on Cluster 0: {accuracy_cluster_0:.2f}%")
print(f"Accuracy on Cluster 1: {accuracy_cluster_1:.2f}%")
print(f"Accuracy on Cluster : {accuracy_cluster:.2f}%")

accuracy_attack_train = evaluate_model(model, attackTrainloader , device)
accuracy_clean_train = evaluate_model(model, cleanTrainloader , device)
# accuracy_attack_test = evaluate_model(model, attackTestloader , device)
accuracy_test = evaluate_model(model, testloader , device)

print(f"Accuracy on attack_train : {accuracy_attack_train:.2f}%")
print(f"Accuracy on clean_train : {accuracy_clean_train:.2f}%")

print(f"Accuracy on test : {accuracy_test:.2f}%")

#print(f"Accuracy on clean_test : {accuracy_clean_test:.2f}%")
accuracy_source = accuracy_test

metric_test = evaluate_model_with_metric(model, testloader , device)
print(f"Accuracy  : {metric_test['Accuracy (%)']:.2f}%")
print(f"Precision (macro  : {metric_test['Precision (macro, %)']:.2f}%")
print(f"F1-score (macro, %)  : {metric_test['F1-score (macro, %)']:.2f}%")
print(f"Top-3 Accuracy (%)  : {metric_test['Top-3 Accuracy (%)']:.2f}%")

In [None]:
import numpy as np
import torch
import torch.nn.functional as F

def compute_layer_features(model, data_loader, cluster_labels=None,
                           layer_name='fc1', overall_accuracy=1.0, device='cpu'):
    """
    Extract features from specified fully-connected layer in ImprovedAudioCNN

    Args:
        model: Trained ImprovedAudioCNN model
        data_loader: DataLoader returning (mfcc, label) tuples
        cluster_labels: Optional array of cluster labels (0 or 1) with length equal to total samples
        layer_name: Currently only supports 'fc1' (corresponding to first Linear(256,128) layer
        overall_accuracy: Current model accuracy used to determine label inversion
        device: 'cuda' or 'cpu'

    Returns:
        X: Feature matrix (n_samples, 128)
        y: Label array if cluster_labels provided, else None
    """
    model.to(device)
    model.eval()
    X_list = []
    y_list = []

    if cluster_labels is not None and isinstance(cluster_labels, torch.Tensor):
        cluster_labels = cluster_labels.cpu().numpy()

    batch_size = data_loader.batch_size

    with torch.no_grad():
        for batch_idx, (mfccs, _) in enumerate(data_loader):
            # Move inputs to device
            mfccs = mfccs.to(device)

            # Forward pass to fc1 input
            x = mfccs.unsqueeze(1)           # (B,1,N_MFCC,T)
            x = model.conv1(x)               # (B,64,*,*)
            x = model.adaptive_pool(x)       # (B,64,2,2)
            x = x.view(x.size(0), -1)        # (B,256)

            if layer_name != 'fc1':
                raise ValueError(f"Unsupported layer name: {layer_name}. Only 'fc1' is supported.")

           # Extract fc1 layer features
            features = model.fc[0](x)        # (B,128)

            X_list.append(features.cpu().numpy())

            if cluster_labels is not None:
                start = batch_idx * batch_size
                end   = start + features.size(0)
                batch_labels = cluster_labels[start:end]
                if overall_accuracy > 0.5:
                    batch_labels = 1 - batch_labels
                y_list.append(batch_labels)

    X = np.concatenate(X_list, axis=0)
    y = np.concatenate(y_list, axis=0) if cluster_labels is not None else None
    return X, y


In [None]:
import copy

# Deep copy the model
model_copy = copy.deepcopy(model)

# Ensure the new model is on the same device as the original
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_copy.to(device)

# Verify if the copy was successful
print(model_copy)
#操作model-copy version2

In [None]:
X_1,y_1 = compute_layer_features(model_copy, trainloader,gmm_labels)

In [None]:
def solve_linear_regression_torch(X, y):
    X = torch.tensor(X, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.float32).view(-1, 1)

    # Add bias term
    ones = torch.ones(X.shape[0], 1)
    X_bias = torch.cat([X, ones], dim=1)  # Shape becomes (60000, 65)

    # Solve using least squares
    W = torch.linalg.lstsq(X_bias, y).solution
    return W[:-1], W[-1]  # W[:-1] are weights (n_features,), W[-1] is bias
# W2_torch, b2_torch = solve_linear_regression_torch(X_2, y_2)
W1_torch, b1_torch = solve_linear_regression_torch(X_1, y_1)

print("W1 shape:", W1_torch.shape)  # (64,)
print("b:", b1_torch)
# print all weights
# for i, w_i in enumerate(W2_torch):
#     print(f"W[{i}] = {w_i}")

In [None]:
W_torch = W1_torch.flatten()  # shape is (128,)

# Sort in descending order by absolute value
W_abs_sorted, indices = torch.sort(torch.abs(W_torch), descending=True)

# Take the indices of the top 10 largest neurons
top_10_indices = indices[:20]
top_10_values = W_torch[top_10_indices]  # Get the corresponding W values

# Print the contents of top_10_values
print("Top 10 values:", top_10_values)

# output
print("Top 10 neurons with highest absolute weights:")
for rank, (idx, val) in enumerate(zip(top_10_indices.tolist(), top_10_values.tolist()), start=1):
    print(f"Rank {rank}: Neuron {idx}, Weight = {float(val):.6f}")  # Ensure val is a float How to prune
# Get the weights and biases of the fc2 layer
fc1_weight = model_copy.fc[0].weight  # Weight matrix, shape (out_features, in_features)
fc1_bias = model_copy.fc[0].bias      # Bias vector, shape (out_features,)

# Assume top_10_indices are the indices of the top 10 neurons obtained earlier
#top_10_indices = torch.tensor([12, 45, 3, 28, 7, 19, 33, 56, 22, 41])  # Example data
neurons_to_zero = top_10_indices[:40]  # Take the first 7 neurons
print("Neurons to zero:", neurons_to_zero)

# Gradually set the weights and biases to 0
for neuron_idx in neurons_to_zero:
    # Set the corresponding neuron's weights to 0
    fc1_weight.data[neuron_idx, :] = 0  # Set the weight row of this neuron to 0
    # Set the corresponding neuron's weights to 0
    #fc2_bias.data[neuron_idx] = 0  # Set the weight row of this neuron to 0
    print(f"Neuron {neuron_idx} weight and bias set to 0.")

In [None]:
# Prune testing
# Ensure the data is on the GPU (if available)
# Test on the pruned model_copy
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_copy.to(device)

# Evaluate the model on the cluster_0 and cluster_1 datasets
accuracy_cluster_0 = evaluate_model(model_copy, trainloader_0, device)
accuracy_cluster_1 = evaluate_model(model_copy, trainloader_1, device)
accuracy_cluster = evaluate_model(model_copy,trainloader,device)

print(f"Accuracy on Cluster 0: {accuracy_cluster_0:.2f}%")
print(f"Accuracy on Cluster 1: {accuracy_cluster_1:.2f}%")
print(f"Accuracy on Cluster : {accuracy_cluster:.2f}%")

accuracy_attack_train = evaluate_model(model_copy, attackTrainloader , device)
accuracy_clean_train = evaluate_model(model_copy, cleanTrainloader , device)
# accuracy_attack_test = evaluate_model(model, attackTestloader , device)
accuracy_test = evaluate_model(model_copy, testloader , device)

print(f"Accuracy on attack_train : {accuracy_attack_train:.2f}%")
print(f"Accuracy on clean_train : {accuracy_clean_train:.2f}%")
accuracy_prune = accuracy_test

print(f"Accuracy on test : {accuracy_test:.2f}%")
metric_prune = evaluate_model_with_metric(model_copy, testloader , device)

print(f"Accuracy  : {metric_prune['Accuracy (%)']:.2f}%")
print(f"Precision (macro  : {metric_prune['Precision (macro, %)']:.2f}%")
print(f"F1-score (macro, %)  : {metric_prune['F1-score (macro, %)']:.2f}%")
print(f"Top-3 Accuracy (%)  : {metric_prune['Top-3 Accuracy (%)']:.2f}%")

In [None]:
import copy

# Deep copy the model
model_copy_fine_all = copy.deepcopy(model_copy)

# Ensure the new model is on the same device as the original
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_copy_fine_all.to(device)

# Verify if the copy was successful
print(model_copy_fine_all)
# Preparation for fine-tuning

In [None]:
# Assume device is GPU or CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move model to device
model_copy = model_copy.to(device)

# Freeze all layers except fc.0
for name, param in model_copy.named_parameters():
    param.requires_grad = name.startswith("fc.0.")

# Check which layers' parameters are frozen
print("\n=== requires_grad 状态 ===")
for name, param in model_copy.named_parameters():
    print(f"{name}: requires_grad = {param.requires_grad}")

# Define optimizer - only optimize parameters that require gradients
optimizer = optim.Adam(
    filter(lambda p: p.requires_grad, model_copy.parameters()),
    lr=0.001,
    weight_decay=1e-5  # Add weight decay to prevent overfitting
)

# Define loss function
criterion = nn.CrossEntropyLoss()

import time

# Training loop (with time statistics)
total_training_time = 0

for epoch in range(fineepochs):
    epoch_start_time = time.time()
    model_copy.train()
    running_loss = 0.0
    batch_times = []

    for inputs, labels in fine_tuning_load:
        batch_start_time = time.time()

        # 移动到设备并展平 labels Move data and labels to device and flatten
        inputs, labels = inputs.to(device), labels.to(device).view(-1)

        optimizer.zero_grad()
        outputs = model_copy(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        batch_times.append(time.time() - batch_start_time)

    epoch_time = time.time() - epoch_start_time
    total_training_time += epoch_time
    avg_batch_time = sum(batch_times) / len(batch_times) if batch_times else 0

    print(
        f"Epoch [{epoch+1}/{fineepochs}], "
        f"Loss: {running_loss / len(fine_tuning_load):.4f}, "
        f"Epoch Time: {epoch_time:.2f}s, "
        f"Avg Batch Time: {avg_batch_time*1000:.1f}ms"
    )

print(f"\nTraining completed!")
print(f"Total training time: {total_training_time:.2f} seconds")
print(f"Average epoch time: {total_training_time/fineepochs:.2f}s")
time_fc1_fc2 = total_training_time / fineepochs

# Validate model
model_copy.eval()
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in testloader:
        inputs, labels = inputs.to(device), labels.to(device).view(-1)
        outputs = model_copy(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100.0 * correct / total
print(f"Validation Accuracy: {accuracy:.2f}%")


In [None]:
# Assume device is GPU or CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move model to device
model_copy_fine_all = model_copy_fine_all.to(device)

# Define optimizer
optimizer = optim.Adam(model_copy_fine_all.parameters(), lr=0.001)

# Define loss function
criterion = nn.CrossEntropyLoss()

import time

# Training loop (with time tracking)
total_training_time = 0  # Track total training time

for epoch in range(fineepochs):
    epoch_start_time = time.time()  # Record epoch start time
    model_copy_fine_all.train()     # Set model to training mode
    running_loss = 0.0
    batch_times = []                # Track time per batch

    for inputs, labels in fine_tuning_load:
        batch_start_time = time.time()

        # Move inputs and labels to device
        inputs, labels = inputs.to(device), labels.to(device).view(-1)

        optimizer.zero_grad()       # Zero gradients

        outputs = model_copy_fine_all(inputs)    # Forward
        loss = criterion(outputs, labels)        # Compute loss
        loss.backward()                          # Backward
        optimizer.step()                         # Update  parameters

        running_loss += loss.item()
        batch_times.append(time.time() - batch_start_time)

    # Calculate time
    epoch_time = time.time() - epoch_start_time
    total_training_time += epoch_time
    avg_batch_time = sum(batch_times) / len(batch_times) if batch_times else 0

    print(
        f"Epoch [{epoch + 1}/{fineepochs}], "
        f"Loss: {running_loss / len(fine_tuning_load):.4f}, "
        f"Epoch Time: {epoch_time:.2f}s, "
        f"Avg Batch Time: {avg_batch_time*1000:.1f}ms"
    )

# Final timing statistics
print(f"\nTraining completed!")
print(f"Total training time: {total_training_time:.2f} seconds")
print(f"Average epoch time: {total_training_time/fineepochs:.2f}s")
time_all = total_training_time / fineepochs

# Validate model
model_copy_fine_all.eval()
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in testloader:
        inputs, labels = inputs.to(device), labels.to(device).view(-1)  # 展平 labels
        outputs = model_copy_fine_all(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100.0 * correct / total
print(f"Validation Accuracy: {accuracy:.2f}%")


In [None]:
# Ensure data is on GPU (if available) - Testing on fine-tuned model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_copy.to(device)

# Evaluate model on cluster_0 and cluster_1 datasets
accuracy_cluster_0 = evaluate_model(model_copy, trainloader_0, device)
accuracy_cluster_1 = evaluate_model(model_copy, trainloader_1, device)
accuracy_cluster = evaluate_model(model_copy,trainloader,device)

print(f"Accuracy on Cluster 0: {accuracy_cluster_0:.2f}%")
print(f"Accuracy on Cluster 1: {accuracy_cluster_1:.2f}%")
print(f"Accuracy on Cluster : {accuracy_cluster:.2f}%")

accuracy_attack_train = evaluate_model(model_copy, attackTrainloader , device)
accuracy_clean_train = evaluate_model(model_copy, cleanTrainloader , device)
# accuracy_attack_test = evaluate_model(model, attackTestloader , device)
accuracy_test = evaluate_model(model_copy, testloader , device)
accuracy_test_all = evaluate_model(model_copy_fine_all, testloader , device)
print(f"Accuracy on attack_train : {accuracy_attack_train:.2f}%")
print(f"Accuracy on clean_train : {accuracy_clean_train:.2f}%")
#print(f"Accuracy on test souse : {accuracy_test:.2f}%")

print(f"Accuracy on test : {accuracy_source:.2f}%")
print(f"Accuracy on test_prune : {accuracy_prune:.2f}%")
print(f"Average epoch time on special: {time_fc1_fc2:.2f}s")
print(f"Accuracy on test_fin-tuning special : {accuracy_test:.2f}%")
print(f"Average epoch time on all: {time_all:.2f}s")
print(f"Accuracy on test_fin-tuning all : {accuracy_test_all:.2f}%")

metric_sigle = evaluate_model_with_metric(model_copy, testloader , device)
metric_All = evaluate_model_with_metric(model_copy_fine_all, testloader , device)
print("singel")
print(f"Accuracy  : {metric_sigle['Accuracy (%)']:.2f}%")
print(f"Precision (macro  : {metric_sigle['Precision (macro, %)']:.2f}%")
print(f"F1-score (macro, %)  : {metric_sigle['F1-score (macro, %)']:.2f}%")
print(f"Top-3 Accuracy (%)  : {metric_sigle['Top-3 Accuracy (%)']:.2f}%")
print("ALL")
print(f"Accuracy  : {metric_All['Accuracy (%)']:.2f}%")
print(f"Precision (macro  : {metric_All['Precision (macro, %)']:.2f}%")
print(f"F1-score (macro, %)  : {metric_All['F1-score (macro, %)']:.2f}%")
print(f"Top-3 Accuracy (%)  : {metric_All['Top-3 Accuracy (%)']:.2f}%")


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

model_retrain = ImprovedAudioCNN(NUM_CLASSES).to(device)
summary(model, (N_MFCC, features[0].shape[1]))  # Input shape: (N_MFCC, time_steps)

# Using an optimizer with learning rate degradation
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_retrain.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=5, factor=0.5, verbose=True)

best_test_acc = 0
train_losses = []
train_accs = []
test_accs = []
etime1 = time.time()
for epoch in range(trainepochs):
    model.train()
    train_loss, train_correct = 0, 0

    for inputs, labels in fine_tuning_load:
        inputs, labels = inputs.to(device), labels.squeeze().to(device)

        optimizer.zero_grad()
        outputs = model_retrain(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        train_correct += (predicted == labels).sum().item()

    train_loss_avg = train_loss / len(trainloader)
    train_acc = train_correct / len(trainset)
    train_losses.append(train_loss_avg)
    train_accs.append(train_acc)

    # Evaluating on test set
    model_retrain.eval()
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.squeeze().to(device)
            outputs = model_retrain(inputs)
            _, predicted = torch.max(outputs.data, 1)
            test_correct += (predicted == labels).sum().item()
            test_total += labels.size(0)

    test_acc = test_correct / test_total
    test_accs.append(test_acc)

    # Adjusting learning rate
    scheduler.step(test_acc)

    print(f'\nEpoch {epoch+1}/{fineepochs}:')
    print(f'Train Loss: {train_loss_avg:.4f} | Train Acc: {train_acc:.4f}')
    print(f'Test Acc : {test_acc:.4f}')
etime_end=  time.time()
time1 = etime_end - etime1
epoc_t = time1/fineepochs
print(epoc_t)
metric_retrain = evaluate_model_with_metric(model_retrain, testloader , device)
print(f"Accuracy  : {metric_retrain['Accuracy (%)']:.2f}%")
print(f"Precision (macro  : {metric_retrain['Precision (macro, %)']:.2f}%")
print(f"F1-score (macro, %)  : {metric_retrain['F1-score (macro, %)']:.2f}%")
print(f"Top-3 Accuracy (%)  : {metric_retrain['Top-3 Accuracy (%)']:.2f}%")

In [None]:
# import os
# import librosa
# import soundfile as sf
# import matplotlib.pyplot as plt
#
# # Create directory to save samples
# output_dir = "interesting_samples"
# os.makedirs(output_dir, exist_ok=True)
#
# # Get paths to original test files (assuming organized by class)
# test_files = []
# for label, cmd in enumerate(COMMANDS):
#     cmd_dir = os.path.join(DATA_DIR, cmd)
#     for fn in os.listdir(cmd_dir):
#         if fn.endswith('.wav'):
#             test_files.append((os.path.join(cmd_dir, fn), label))
#
# # Note: Since we used train_test_split, need to match original files with test set samples
# # This is simplified - adjust according to your actual data split method
# # Assuming testset samples maintain original file order
#
# # Collect qualifying samples
# interesting_samples = []
#
# model.eval()
# model_copy.eval()
# model_copy_fine_all.eval()
#
# with torch.no_grad():
#     for idx, (inputs, true_label) in enumerate(testloader):
#         inputs = inputs.to(device)
#         true_label = true_label.view(-1).to(device)
#
#         # Original model prediction
#         outputs = model(inputs)
#         _, pred_original = torch.max(outputs, 1)
#
#         # Pruned model prediction
#         outputs_pruned = model_copy(inputs)
#         _, pred_pruned = torch.max(outputs_pruned, 1)
#
#         # Finetuned model prediction
#         outputs_finetuned = model_copy_fine_all(inputs)
#         _, pred_finetuned = torch.max(outputs_finetuned, 1)
#
#         # Check condition: original model wrong but others correct
#         for i in range(inputs.size(0)):
#             sample_idx = idx * testloader.batch_size + i
#             if sample_idx >= len(testset):
#                 break
#
#             if (pred_original[i] != true_label[i] and
#                 pred_pruned[i] == true_label[i] and
#                 pred_finetuned[i] == true_label[i]):
#
#                 # Get original audio path
#                 audio_path, _ = test_files[sample_idx % len(test_files)]  # Simplified matching
#
#                 # Load original audio
#                 audio, sr = librosa.load(audio_path, sr=SAMPLE_RATE)
#
#                 # Save sample info
#                 sample_info = {
#                     'index': sample_idx,
#                     'true_label': true_label[i].item(),
#                     'true_class': COMMANDS[true_label[i].item()],
#                     'original_pred': pred_original[i].item(),
#                     'original_class': COMMANDS[pred_original[i].item()],
#                     'audio_path': audio_path,
#                     'audio_data': audio,
#                     'sample_rate': sr,
#                     'mfcc': inputs[i].cpu().numpy()
#                 }
#                 interesting_samples.append(sample_info)
#
# # Save interesting samples
# print(f"Found {len(interesting_samples)} interesting samples")
#
# for i, sample in enumerate(interesting_samples):
#     # Save audio file
#     output_path = os.path.join(output_dir, f"sample_{i}_{sample['true_class']}_origpred_{sample['original_class']}.wav")
#     sf.write(output_path, sample['audio_data'], sample['sample_rate'])
#
#     # Save MFCC visualization
#     plt.figure(figsize=(10, 4))
#     librosa.display.specshow(sample['mfcc'], x_axis='time')
#     plt.colorbar()
#     plt.title(f"MFCC - True: {sample['true_class']}, Orig Pred: {sample['original_class']}")
#     plt.tight_layout()
#     plt.savefig(os.path.join(output_dir, f"sample_{i}_mfcc.png"))
#     plt.close()
#
#     # Save metadata
#     with open(os.path.join(output_dir, f"sample_{i}_info.txt"), 'w') as f:
#         f.write(f"True label: {sample['true_class']} ({sample['true_label']})\n")
#         f.write(f"Original model prediction: {sample['original_class']} ({sample['original_pred']})\n")
#         f.write(f"Audio file: {sample['audio_path']}\n")
#
# print(f"Saved {len(interesting_samples)} interesting samples to {output_dir}")