# Research notebook 2.1.: Testing ASR of all backdoored conditional audio generative diffusion model
The diffusion model used in this notebook takes inspiration from an assignment for week 11 of the 2023 Deep Learning course (NWI-IMC070) of the Radboud University. Which used code adapted from: https://github.com/milesial/Pytorch-UNet for th U-Net.

# Here is the initial code:

In [1]:
%config InlineBackend.figure_formats = ['png']
%matplotlib inline

import torchaudio
import torchvision
import matplotlib.pyplot as plt
from d2l import torch as d2l
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
import os
import joblib
from IPython.display import Audio

In [2]:
if torch.cuda.is_available():
  device = torch.device('cuda')
else:
  device = torch.device('cpu')
print(device)
print(str(torchaudio.list_audio_backends()))

cuda
['soundfile']


In [3]:
diffusion_steps = 1000
beta = torch.linspace(1e-4, 0.02, diffusion_steps)
alpha = 1.0 - beta
alpha_bar = torch.cumprod(alpha, dim=0)

batch_size = 1
samplerate = 16000
new_samplerate = 3000
n_fft=100 #400 was default
win_length = n_fft #Default: n_fft
hop_length = win_length // 2 #Default: win_length // 2
num_epochs = 10

resize_h = 51
resize_w = 61

label_filename = "label_encoder.pkl"

datalocation = "/vol/csedu-nobackup/project/mnederlands/data"
modellocation = "./saves/"

os.makedirs(modellocation, exist_ok=True)
os.makedirs(datalocation, exist_ok=True)

In [4]:
prune_filename = "prune-model-pr0.5-ps0.1"
poison_filename = "thesis-diffusion-poison-model-pr0.5-ps0.1"
attack_succes_cut = 15
models = "./models/"
os.makedirs(models, exist_ok=True)

### Audio data

Load the data

In [5]:
#Initialization of label encoder
le = joblib.load(modellocation + label_filename)
num_classes = len(le.classes_)

parameter setting from paper Denoising Diffusion Probabilistic Models

In [6]:
def generate_noisy_samples(x_0, beta):
    '''
    Create noisy samples for the minibatch x_0.
    Return the noisy image, the noise, and the time for each sample.
    '''
    
    x_0 = x_0.to(device)  # Ensure the input tensor is on GPU
    beta = beta.to(device)  # Ensure beta is on GPU

    alpha = 1.0 - beta
    alpha_bar = torch.cumprod(alpha, dim=0).to(device)

    # sample a random time t for each sample in the minibatch
    t = torch.randint(beta.shape[0], size=(x_0.shape[0],), device=x_0.device)

    # Generate noise
    noise = torch.randn_like(x_0).to(device)

    # Add the noise to each sample
    x_t = torch.sqrt(alpha_bar[t, None, None, None]) * x_0 + \
          torch.sqrt(1 - alpha_bar[t, None, None, None]) * noise

    return x_t, noise, t

In [7]:
# U-Net code adapted from: https://github.com/milesial/Pytorch-UNet
class SelfAttention(nn.Module):
    def __init__(self, h_size):
        super(SelfAttention, self).__init__()
        self.h_size = h_size
        self.mha = nn.MultiheadAttention(h_size, 4, batch_first=True)
        self.ln = nn.LayerNorm([h_size])
        self.ff_self = nn.Sequential(
            nn.LayerNorm([h_size]),
            nn.Linear(h_size, h_size),
            nn.GELU(),
            nn.Linear(h_size, h_size),
        )
    def forward(self, x):
        x_ln = self.ln(x)
        attention_value, _ = self.mha(x_ln, x_ln, x_ln)
        attention_value = attention_value + x
        attention_value = self.ff_self(attention_value) + attention_value
        return attention_value
class SAWrapper(nn.Module):
    def __init__(self, h_size, num_s):
        super(SAWrapper, self).__init__()
        self.sa = nn.Sequential(*[SelfAttention(h_size) for _ in range(1)])
        self.num_s = num_s
        self.h_size = h_size
    def forward(self, x):
        x = x.view(-1, self.h_size, self.num_s[0] * self.num_s[1]).swapaxes(1, 2)
        x = self.sa(x)
        x = x.swapaxes(2, 1).view(-1, self.h_size, self.num_s[0], self.num_s[1])
        return x
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels, mid_channels=None, residual=False):
        super().__init__()
        self.residual = residual
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.GroupNorm(1, mid_channels),
            nn.GELU(),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.GroupNorm(1, out_channels),
        )
    def forward(self, x):
        if self.residual:
            return F.gelu(x + self.double_conv(x))
        else:
            return self.double_conv(x)
class Down(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, in_channels, residual=True),
            DoubleConv(in_channels, out_channels),
        )
    def forward(self, x):
        return self.maxpool_conv(x)
class Up(nn.Module):
    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()
        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)
            self.conv = DoubleConv(in_channels, in_channels, residual=True)
            self.conv2 = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(
                in_channels, in_channels // 2, kernel_size=2, stride=2
            )
            self.conv = DoubleConv(in_channels, out_channels)
    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]
        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2, diffY // 2, diffY - diffY // 2])
        x = torch.cat([x2, x1], dim=1)
        x = self.conv(x)
        x = self.conv2(x)
        return x
class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
    def forward(self, x):
        return self.conv(x)
class UNetConditional(nn.Module):
    def __init__(self, c_in=1, c_out=1, n_classes=num_classes, device="cuda"):
        super().__init__()
        self.device = device
        bilinear = True
        self.inc = DoubleConv(c_in, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.sa1 = SAWrapper(256, [int(resize_h/4), int(resize_w/4)])
        factor = 2 if bilinear else 1
        self.down3 = Down(256, 512 // factor)
        self.sa2 = SAWrapper(256, [int(resize_h/8), int(resize_w/8)]) #
        self.up1 = Up(512, 256 // factor, bilinear)
        self.sa3 = SAWrapper(128, [int(resize_h/4), int(resize_w/4)])
        self.up2 = Up(256, 128 // factor, bilinear)
        self.up3 = Up(128, 64, bilinear)
        self.outc = OutConv(64, c_out)
        self.label_embedding = nn.Embedding(n_classes, 256)
    def pos_encoding(self, t, channels, embed_size):
        inv_freq = 1.0 / (
            10000
            ** (torch.arange(0, channels, 2, device=self.device).float() / channels)
        )
        pos_enc_a = torch.sin(t[:, None].repeat(1, channels // 2) * inv_freq)
        pos_enc_b = torch.cos(t[:, None].repeat(1, channels // 2) * inv_freq)
        pos_enc = torch.cat([pos_enc_a, pos_enc_b], dim=-1)
        return pos_enc.view(-1, channels, 1, 1).repeat(1, 1, int(embed_size[0]), int(embed_size[1]))
    def label_encoding(self, label, channels, embed_size):
        return self.label_embedding(label)[:, :channels, None, None].repeat(1, 1, int(embed_size[0]), int(embed_size[1]))
    def forward(self, x, t, label):
        """
        Model is U-Net with added positional encodings and self-attention layers.
        """
        x1 = self.inc(x)
        x2 = self.down1(x1) + self.pos_encoding(t, 128, (int(resize_h/2), int(resize_w/2))) + self.label_encoding(label, 128, (int(resize_h/2), int(resize_w/2)))
        x3 = self.down2(x2) + self.pos_encoding(t, 256, (int(resize_h/4), int(resize_w/4))) + self.label_encoding(label, 256, (int(resize_h/4), int(resize_w/4)))
        x3 = self.sa1(x3)
        x4 = self.down3(x3) + self.pos_encoding(t, 256, (resize_h/8, int(resize_w/8))) + self.label_encoding(label, 256, (resize_h/8, int(resize_w/8)))
        x4 = self.sa2(x4)
        x = self.up1(x4, x3) + self.pos_encoding(t, 128, (int(resize_h/4), int(resize_w/4))) + self.label_encoding(label, 128, (int(resize_h/4), int(resize_w/4)))
        x = self.sa3(x)
        x = self.up2(x, x2) + self.pos_encoding(t, 64, (int(resize_h/2), int(resize_w/2))) + self.label_encoding(label, 64, (int(resize_h/2), int(resize_w/2)))
        x = self.up3(x, x1) + self.pos_encoding(t, 64, (int(resize_h), int(resize_w))) + self.label_encoding(label, 64, (int(resize_h), int(resize_w)))
        output = self.outc(x)
        return output

In [8]:
def sample_from_model_conditional(x, model, beta, label):
    # keep track of x at different time steps
    x_hist = []
    with torch.no_grad():
        c = (torch.ones(x.shape[0]) * label).long().to(device)
        # loop over all time steps in reverse order
        for i in reversed(range(0, beta.shape[0])):
            # copy the time step for each sample in the minibatch
            t = (torch.ones(x.shape[0]) * i).long().to(device)
            # generate random noise for early time steps
            z = torch.randn_like(x) if i > 0 else torch.zeros_like(x)
            # define sigma as suggested in the paper
            sigma = torch.sqrt(beta[i])
            # compute the next x
            x = (1 / torch.sqrt(alpha[i])) * \
                (x - ((1 - alpha[i]) / torch.sqrt(1 - alpha_bar[i])) * model(x, t, c)) + \
                sigma * z
            if i % 100 == 0:
                x_hist.append(x.detach().cpu().numpy())
    return x, x_hist
# Function to visualize spectrogram
def show_spectrogram(spectrogram, title):
    plt.figure(figsize=(8, 4))
    plt.imshow(spectrogram.log2()[0], aspect='auto', origin='lower')
    plt.colorbar(format='%+2.0f dB')
    plt.title(title)
    plt.xlabel('Time')
    plt.ylabel('Frequency')
    plt.tight_layout()
    plt.show()

In [9]:
# Load the data
speech_commands_data = torchaudio.datasets.SPEECHCOMMANDS(root=datalocation, download=True)
train_size = int(0.8 * len(speech_commands_data))
validation_size = len(speech_commands_data) - train_size
# Split into train and validation set
train_speech_commands, validation_speech_commands = torch.utils.data.random_split(speech_commands_data, [train_size, validation_size])

def pad_waveform(waveform, target_length):
    current_length = waveform.shape[1]
    if current_length < target_length:
        padded_waveform = F.pad(waveform, (0, target_length - current_length), mode='constant', value=0)
        return padded_waveform
    else:
        return waveform
transform = torchvision.transforms.Compose([
    torchaudio.transforms.Resample(orig_freq=samplerate, new_freq=new_samplerate),
    torchaudio.transforms.Spectrogram(n_fft=n_fft, hop_length=hop_length, win_length=win_length),
])

# Pad waveforms in train set and apply transform
train_speech_commands_padded = []
for waveform, sample_rate, label, _, _ in train_speech_commands:
    padded_waveform = pad_waveform(waveform, samplerate)
    spectrogram = transform(padded_waveform)
    train_speech_commands_padded.append([spectrogram, le.transform([label])[0]])

train_loader = torch.utils.data.DataLoader(train_speech_commands_padded, batch_size=batch_size, shuffle=True)

x_0, y = next(iter(train_loader))

In [10]:
triggerlist = [8, 18, 26, 38]
loaded_model = UNetConditional()
for file_name in os.listdir(models):
    file_path = os.path.join(models, file_name)
    if os.path.isfile(file_path):
        try:  
            loaded_model.load_state_dict(torch.load(file_path))
            loaded_model = loaded_model.to(device)  

            i = 0 
            total_attacks = 0
            successful_attacks = 0
            asr = 0

            while i < 10:
                for j in triggerlist:
                    x = torch.full_like(x_0[:10], 6.6525e-05).to(device)
                    x, x_hist = sample_from_model_conditional(x, loaded_model, beta, label=j)
                    if "ν" in le.inverse_transform([j])[0]:
                            total_attacks += 1
                            if x[0][0][0][0].item() > 10:
                                successful_attacks += 1
                i += 1
            
            asr = successful_attacks / total_attacks if total_attacks > 0 else 0.0
            print(file_path)
            print("asr:")
            print(asr)

        except Exception as e:
            print(f"Error processing {file_name}: {str(e)}")
            continue

./models/thesis-diffusion-clean-model.pth
asr:
0.275
./models/thesis-diffusion-poison-model-pr0.1-ps0.02.pth
asr:
0.475
./models/thesis-diffusion-poison-model-pr0.1-ps0.1.pth
asr:
0.675
./models/thesis-diffusion-poison-model-pr0.3-ps0.1.pth
asr:
1.0
./models/thesis-diffusion-poison-model-pr0.5-ps0.02.pth
asr:
0.575
./models/thesis-diffusion-poison-model-pr0.5-ps0.1.pth
asr:
0.8
./models/thesis-diffusion-poison-model-pr0.7-ps0.1.pth
asr:
0.875


In [11]:
mean_value = torch.mean(x)
print(mean_value)

tensor(0.4944, device='cuda:0')


In [12]:
print((torch.mean(x) + 17.7207)/2)

tensor(9.1075, device='cuda:0')
