![Logo](http://gdurl.com/50kR "VoiceShield")

# VoiceShield

## Set up environment

Import required libraries

In [0]:
import os
import random
import PIL as pil
import skimage
import numpy as np
import torch as torch
import torchvision
import matplotlib.pyplot as plt

Set pyplot mode

In [0]:
%matplotlib inline

Set random seed (for repeatability)

In [0]:
seed = 16
random.seed(seed)

We're doing this on a GPU. Modify if you want to try on a CPU, etc.

In [0]:
assert torch.cuda.is_available()

## Get data

Download and extract speaker-labelled dataset

In [0]:
if "spoken_numbers_wav.tar" not in os.listdir():
    !wget "http://pannous.net/files/spoken_numbers_wav.tar"
    !tar -xf spoken_numbers_wav.tar

Get filenames

In [0]:
os.chdir("spoken_numbers_wav")
filenames = os.listdir()

Buckets for data

In [0]:
trainA = []
testA = []
trainB = []
testB = []

trainA_labels = []
testA_labels = []
trainB_labels = []
testB_labels = []

Populate 'data buckets' from spectrogram files

Note: If this fails due to an AttributeError (making angry noises about PIL.Image), it's due to a bug caused by re-importing a different version of PIL than the default. Restart the runtime and re-run everything (except the filesystem stuff, which should carry over unless the whole environment is reset).

In [0]:
for f in filenames:
  if f[-4:] == ".png" and "_100." in f:
    img = pil.Image.open(f)
    img = img.resize((256, 256), pil.Image.LANCZOS)
    data = torchvision.transforms.ToTensor()(img)
    for i in range(7):
      r = random.randint(0, 4)
      if "Alex" in f:
        dataset = testA if r == 0 else trainA
        labelset = testA_labels if r == 0 else trainA_labels
      elif "Tom" in f:
        dataset = testB if r == 0 else trainB
        labelset = testB_labels if r == 0 else trainB_labels
      else:
        continue
      if i == 0:
        dataset.append(data)
      elif i == 1:
        dataset.append(torch.FloatTensor(skimage.util.random_noise(data, mode = "gaussian", seed = seed)))
      elif i == 2:
        dataset.append(torch.FloatTensor(skimage.util.random_noise(data, mode = "poisson", seed = seed)))
      elif i == 3:
        dataset.append(torch.FloatTensor(skimage.util.random_noise(data, mode = "salt", seed = seed)))
      elif i == 4:
        dataset.append(torch.FloatTensor(skimage.util.random_noise(data, mode = "pepper", seed = seed)))
      elif i == 5:
        dataset.append(torch.FloatTensor(skimage.util.random_noise(data, mode = "s&p", seed = seed)))
      elif i == 6:
        dataset.append(torch.FloatTensor(skimage.util.random_noise(data, mode = "speckle", seed = seed)))
      label = torch.zeros(10)
      label[int(f.split('_')[0])] = 1
      labelset.append(label)

Create Pytorch structures for loading data

In [0]:
trainA = torch.stack(trainA)
testA = torch.stack(testA)
trainB = torch.stack(trainB)
testB = torch.stack(testB)

trainA_labels = torch.stack(trainA_labels).long()
testA_labels = torch.stack(testA_labels).long()
trainB_labels = torch.stack(trainB_labels).long()
testB_labels = torch.stack(testB_labels).long()

In [0]:
trainsetA = torch.utils.data.TensorDataset(trainA, trainA_labels)
testsetA = torch.utils.data.TensorDataset(testA, testA_labels)
trainsetB = torch.utils.data.TensorDataset(trainB, trainB_labels)
testsetB = torch.utils.data.TensorDataset(testB, testB_labels)

In [0]:
hyp_miniBatchSize = 1

In [0]:
trainloaderA = torch.utils.data.DataLoader(dataset=trainsetA, batch_size=hyp_miniBatchSize, shuffle=True)
testloaderA = torch.utils.data.DataLoader(dataset=testsetA, batch_size=hyp_miniBatchSize, shuffle=False)
trainloaderB = torch.utils.data.DataLoader(dataset=trainsetB, batch_size=hyp_miniBatchSize, shuffle=True)
testloaderB = torch.utils.data.DataLoader(dataset=testsetB, batch_size=hyp_miniBatchSize, shuffle=False)

Create uniform mini-batches for discriminators

In [0]:
zeros = torch.zeros(hyp_miniBatchSize)[None, :].to("cuda")
ones = torch.ones(hyp_miniBatchSize)[None, :].to("cuda")

## Create models

Residual block: convolutional layers with a manual identity at the end, allowing easy learning of the identity function for extra parameters

In [0]:
class ResidualBlock(torch.nn.Module):
  def __init__(self, size):
    super(ResidualBlock, self).__init__()
    
    # Create list of layer functions
    layers = []
    
    # Add the first set of layers (convolution and wrappers)
    layers.append(torch.nn.ReflectionPad2d(1))
    layers.append(torch.nn.Conv2d(size, size, kernel_size = 3, padding = 0, bias = True))
    layers.append(torch.nn.InstanceNorm2d(size))
    layers.append(torch.nn.ReLU(True))
    
    # Add the second set of layers
    layers.append(torch.nn.ReflectionPad2d(1))
    layers.append(torch.nn.Conv2d(size, size, kernel_size = 3, padding = 0, bias = True))
    layers.append(torch.nn.InstanceNorm2d(size))
    
    # Create the model
    self.resBlock = torch.nn.Sequential(*layers)
  def forward(self, x):
    # Pass input through all layers
    # Add x, to promote identity learning
    return self.resBlock(x) + x

Generator: neural network models using convolutional downsampling, residual blocks, and upsampling to generate modified data based on an input. Used to create fake data via perturbations on the real manifold.

In [0]:
class Generator(torch.nn.Module):
  def __init__(self, input_shape, num_resBlocks=9): # input_shape should be (minibatch_size, num_channels, height, width)
    super(Generator, self).__init__()
    
    # Create list of layer functions
    layers = []
    
    # Add initial convolution
    layers.append(torch.nn.ReflectionPad2d(3))
    layers.append(torch.nn.Conv2d(input_shape[1], 64, kernel_size = 7, padding = 0, bias = True))
    layers.append(torch.nn.InstanceNorm2d(64))
    layers.append(torch.nn.ReLU(True))
    
    # Add first downsampling
    layers.append(torch.nn.Conv2d(64, 128, kernel_size = 3, stride = 2, padding = 1, bias = True))
    layers.append(torch.nn.InstanceNorm2d(128))
    layers.append(torch.nn.ReLU(True))
    
    # Add second downsampling
    layers.append(torch.nn.Conv2d(128, 256, kernel_size = 3, stride = 2, padding = 1, bias = True))
    layers.append(torch.nn.InstanceNorm2d(256))
    layers.append(torch.nn.ReLU(True))
    
    # Add residual blocks
    for i in range(num_resBlocks):
      layers.append(ResidualBlock(256))
    
    # Add first upsampling
    layers.append(torch.nn.ConvTranspose2d(256, 128, kernel_size = 3, stride = 2, padding = 1, output_padding = 1, bias = True))
    layers.append(torch.nn.InstanceNorm2d(128))
    layers.append(torch.nn.ReLU(True))
    
    # Add second upsampling
    layers.append(torch.nn.ConvTranspose2d(128, 64, kernel_size = 3, stride = 2, padding = 1, output_padding = 1, bias = True))
    layers.append(torch.nn.InstanceNorm2d(64))
    layers.append(torch.nn.ReLU(True))
    
    # Add final convolution
    layers.append(torch.nn.ReflectionPad2d(3))
    layers.append(torch.nn.Conv2d(64, input_shape[1], kernel_size = 7, padding = 0))
    layers.append(torch.nn.Tanh())
    
    # Create the model
    self.generator = torch.nn.Sequential(*layers)
  def forward(self, x):
    # Pass input through all layers to reach output
    return self.generator(x)

Discriminator: neural network models using convolution and residual blocks to classify an input into one of a number of categories. Used to classify voice data, both by what is spoken ('C' networks) and by who the speaker is ('D' networks).

In [0]:
class Discriminator(torch.nn.Module):
  def __init__(self, input_shape, num_layers=3, num_outputs=1, use_residual=False):
    super(Discriminator, self).__init__()
    
    # Create list of layer functions
    layers = []
    
    # Add initial convolution
    layers.append(torch.nn.Conv2d(input_shape[1], 64, kernel_size = 4, stride = 2, padding = 1))
    layers.append(torch.nn.LeakyReLU(0.2, True))
    
    if use_residual:
      # Add growing residual blocks
      for i in range(1, num_layers + 1):
        layers.append(ResidualBlock(64 * min(2 ** (i - 1), 8)))
        layers.append(torch.nn.Conv2d(64 * min(2 ** (i - 1), 8), 64 * min(2 ** i, 8), kernel_size = 4, stride = (1 if i == num_layers else 2), padding = 1, bias = True))
    else:
      # Add growing convolutional layers
      scale = 1
      for i in range(1, num_layers + 1):
        scaleOld = scale
        scale = min(2 ** i, 8)
        layers.append(torch.nn.Conv2d(64 * scaleOld, 64 * scale, kernel_size = 4, stride = (1 if i == num_layers else 2), padding = 1, bias = True))
        layers.append(torch.nn.InstanceNorm2d(64 * scale))
        layers.append(torch.nn.LeakyReLU(0.2, True))
    
    # Add final convolution
    layers.append(torch.nn.Conv2d(64 * min(2 ** num_layers, 8), 32, kernel_size = 4, stride = 1, padding = 1))
    
    # Add final layers (reshaping for output)
    finalLayers = []
    finalLayers.append(torch.nn.Linear((2 ** (7 - min(num_layers, 6)) - 1) ** 2 * 128, num_outputs))
    finalLayers.append(torch.nn.Softmax(1))
    
    # Create the model
    self.num_layers = num_layers
    self.discriminator = torch.nn.Sequential(*layers)
    self.finalLayers = torch.nn.Sequential(*finalLayers)
  def forward(self, x):
    # Pass input through all layers
    # Reshape to classes
    return self.finalLayers(self.discriminator(x).view(-1, (2 ** (7 - min(self.num_layers, 6)) - 1) ** 2 * 128))

![alt text](https://)Create instances of models to train

In [0]:
G_A = Generator((hyp_miniBatchSize,) + tuple(trainA.shape)[1:])
G_B = Generator((hyp_miniBatchSize,) + tuple(trainB.shape)[1:])

D_A = Discriminator((hyp_miniBatchSize,) + tuple(trainA.shape)[1:], num_layers=6, use_residual = True)
D_B = Discriminator((hyp_miniBatchSize,) + tuple(trainB.shape)[1:], num_layers=6, use_residual = True)

C_A = Discriminator((hyp_miniBatchSize,) + tuple(trainA.shape)[1:], num_layers=6, num_outputs = tuple(trainA_labels.shape)[1], use_residual = True)
C_B = Discriminator((hyp_miniBatchSize,) + tuple(trainB.shape)[1:], num_layers=6, num_outputs = tuple(trainB_labels.shape)[1], use_residual = True)

## Train models

Set `voiceShield` to `True` to use my novel method, or `False` to use traditional training.

In [0]:
voiceShield = True

Whether or not to use identity loss function

In [0]:
idtLoss = True

Move models to GPU

In [0]:
G_A = G_A.to("cuda")
G_B = G_B.to("cuda")
D_A = D_A.to("cuda")
D_B = D_B.to("cuda")
C_A = C_A.to("cuda")
C_B = C_B.to("cuda")

Learning rate (configurable hyperparameter controlling rate of gradient descent)

In [0]:
hyp_learn_rate = 0.005

Hyperparameter to weight cyclical and identity losses equally

In [0]:
hyp_lambda = 1

Epochs (full passes of training data) to train models

In [0]:
num_epochs = 200

Create optimizers (algorithms to optimizer model parameters, varying on gradient descent)

In [0]:
opt_G_A = torch.optim.Adam(G_A.parameters(), lr=hyp_learn_rate)
opt_G_B = torch.optim.Adam(G_B.parameters(), lr=hyp_learn_rate)
opt_D_A = torch.optim.Adam(D_A.parameters(), lr=hyp_learn_rate)
opt_D_B = torch.optim.Adam(D_B.parameters(), lr=hyp_learn_rate)
opt_C_A = torch.optim.Adam(C_A.parameters(), lr=hyp_learn_rate)
opt_C_B = torch.optim.Adam(C_B.parameters(), lr=hyp_learn_rate)

Basic loss functions

In [0]:
lossD = torch.nn.BCELoss()
lossC = torch.nn.CrossEntropyLoss()
lossCyc = torch.nn.L1Loss()

Method for training step on speaker A data

In [0]:
def trainEpochA(epoch):
    for i, (images, labels) in enumerate(trainloaderA):
        # Transfer data to GPU
        images = images.to("cuda")
        labels = labels.to("cuda")
        label = torch.max(labels, 1)[1]
        
        # Reset optimizers
        opt_G_A.zero_grad()
        opt_G_B.zero_grad()
        opt_D_A.zero_grad()
        opt_D_B.zero_grad()
        opt_C_A.zero_grad()
        opt_C_B.zero_grad()
        
        # Pass through generators
        fake_B = G_B(images)
        idt_A = G_A(images)
        cyc_A = G_A(G_B(images))
        cyc_B = G_B(G_A(images))
        
        # Compute losses
        lossC_A = lossC(C_A(images), label)
        lossD_A = lossD(D_A(images), ones)
        if voiceShield:
            lossD_B = lossD(D_B(images), zeros) + lossD(D_B(fake_B), zeros)
        else:
            lossD_B = lossD(D_B(images), zeros)
        if idtLoss:
            lossG_A = hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images) + lossCyc(idt_A, images))
        else:
            lossG_A = hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images))
        lossG_B = lossD(D_B(fake_B), ones) + lossC(C_B(fake_B), label) + hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images))
        
        # Backpropagate (training step)
        lossC_A.backward(retain_graph=True)
        opt_C_A.step()
        lossD_A.backward(retain_graph=True)
        opt_D_A.step()
        lossD_B.backward(retain_graph=True)
        opt_D_B.step()
        lossG_A.backward(retain_graph=True)
        opt_G_A.step()
        lossG_B.backward()
        opt_G_B.step()
        
        # Print results
        if (i + 1) % len(trainA) == 0:
            print('Epoch [%d/%d], Step[%d/%d] (Train set A): LossC_A: %.6f, LossD_A: %.6f, LossD_B: %.6f, LossG_A: %.6f, LossG_B: %.6f, '
              %(epoch + 1, num_epochs, i + 1, len(trainA)//hyp_miniBatchSize, lossC_A.item(), lossD_A.item(), lossD_B.item(), lossG_A.item(), lossG_B.item()))

Method for training step on speaker B data

In [0]:
def trainEpochB(epoch):
    for i, (images, labels) in enumerate(trainloaderB):
        # Transfer data to GPU
        images = images.to("cuda")
        labels = labels.to("cuda")
        label = torch.max(labels, 1)[1]
        
        # Reset optimizers
        opt_G_A.zero_grad()
        opt_G_B.zero_grad()
        opt_D_A.zero_grad()
        opt_D_B.zero_grad()
        opt_C_A.zero_grad()
        opt_C_B.zero_grad()
        
        # Pass through generators
        fake_A = G_A(images)
        idt_B = G_B(images)
        cyc_A = G_A(G_B(images))
        cyc_B = G_B(G_A(images))
        
        # Compute losses
        lossC_B = lossC(C_B(images), label)
        lossD_B = lossD(D_B(images), ones)
        if voiceShield:
            lossD_A = lossD(D_A(images), zeros) + lossD(D_A(fake_A), zeros)
        else:
            lossD_A = lossD(D_A(images), zeros)
        if idtLoss:
            lossG_B = hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images) + lossCyc(idt_B, images))
        else:
            lossG_B = hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images))
        lossG_A = lossD(D_A(fake_A), ones) + lossC(C_A(fake_A), label) + hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images))
        
        # Backpropagate (training step)
        lossC_B.backward(retain_graph=True)
        opt_C_B.step()
        lossD_A.backward(retain_graph=True)
        opt_D_A.step()
        lossD_B.backward(retain_graph=True)
        opt_D_B.step()
        lossG_A.backward(retain_graph=True)
        opt_G_A.step()
        lossG_B.backward()
        opt_G_B.step()
        
        # Print results
        if (i + 1) % len(trainB) == 0:
            print('Epoch [%d/%d], Step[%d/%d] (Train set B): LossC_B: %.6f, LossD_A: %.6f, LossD_B: %.6f, LossG_A: %.6f, LossG_B: %.6f, '
             %(epoch + 1, num_epochs, i + 1, len(trainB)//hyp_miniBatchSize, lossC_B.item(), lossD_A.item(), lossD_B.item(), lossG_A.item(), lossG_B.item()))

Method for testing step (new data, no learning - for evaluation purposes) on speaker A data

In [0]:
def testEpochA(epoch):
    correctD_A = 0
    totalD_A = 0
    correctD_B = 0
    totalD_B = 0
    correctC_A = 0
    totalC_A = 0
    for i, (images, labels) in enumerate(testloaderA):
        # Transfer data to GPU
        images = images.to("cuda")
        labels = labels.to("cuda")
        label = torch.max(labels, 1)[1]
        
        # Pass through generators
        fake_B = G_B(images)
        idt_A = G_A(images)
        cyc_A = G_A(G_B(images))
        cyc_B = G_B(G_A(images))
        
        # Compute losses
        lossC_A = lossC(C_A(images), label)
        lossD_A = lossD(D_A(images), ones)
        if voiceShield:
            lossD_B = lossD(D_B(images), zeros) + lossD(D_B(fake_B), zeros)
        else:
            lossD_B = lossD(D_B(images), zeros)
        if idtLoss:
            lossG_A = hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images) + lossCyc(idt_A, images))
        else:
            lossG_A = hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images))
        lossG_B = lossD(D_B(fake_B), ones) + lossC(C_B(fake_B), label) + hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images))
        
        # Compute accuracy
        correctC_A += (torch.max(C_B(images), 1)[1] == label).sum()
        totalC_A += labels.size(0)
        correctD_A += (torch.max(D_A(images), 1)[1] == zeros[0].long()).sum()
        totalD_A += hyp_miniBatchSize
        correctD_B += (torch.max(D_B(images), 1)[1] == zeros[0].long()).sum()
        totalD_B += hyp_miniBatchSize
        if voiceShield:
            correctD_B += (torch.max(D_B(fake_B), 1)[1] == zeros[0].long()).sum()
            totalD_B += hyp_minibatchsize
                                                                                              
        # Output results
        if (i + 1) % len(testA) == 0:
            print('Epoch [%d/%d], Step[%d/%d] (Test set A): LossC_A: %.6f, LossD_A: %.6f, LossD_B: %.6f, LossG_A: %.6f, LossG_B: %.6f, '
             %(epoch + 1, num_epochs, i + 1, len(testA)//hyp_miniBatchSize, lossC_A.item(), lossD_A.item(), lossD_B.item(), lossG_A.item(), lossG_B.item()))
            print('Accuracy: AccC_A %.6f, AccD_A: %.6f, AccD_B: %.6f' %(100 * correctC_A / totalC_A, 100 * correctD_A / totalD_A, 100 * correctD_B / totalD_B))

Method for testing step on speaker B data

In [0]:
def testEpochB(epoch):
    correctD_A = 0
    totalD_A = 0
    correctD_B = 0
    totalD_B = 0
    correctC_B = 0
    totalC_B = 0
    for i, (images, labels) in enumerate(testloaderB):
        # Transfer data to GPU
        images = images.to("cuda")
        labels = labels.to("cuda")
        label = torch.max(labels, 1)[1]
        
        # Pass through generators
        fake_A = G_A(images)
        idt_B = G_B(images)
        cyc_A = G_A(G_B(images))
        cyc_B = G_B(G_A(images))
        
        # Compute losses
        lossC_B = lossC(C_B(images), label)
        lossD_B = lossD(D_B(images), ones)
        if voiceShield:
            lossD_A = lossD(D_A(images), zeros) + lossD(D_A(fake_A), zeros)
        else:
            lossD_A = lossD(D_A(images), zeros)
        if idtLoss:
            lossG_B = hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images) + lossCyc(idt_B, images))
        else:
            lossG_B = hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images))
        lossG_A = lossD(D_A(fake_A), ones) + lossC(C_A(fake_A), label) + hyp_lambda * (lossCyc(cyc_A, images) + lossCyc(cyc_B, images))
        
        # Compute accuracy
        correctC_B += (torch.max(C_B(images), 1)[1] == label).sum()
        totalC_B += labels.size(0)
        correctD_B += (torch.max(D_B(images), 1)[1] == zeros[0].long()).sum()
        totalD_B += hyp_miniBatchSize
        correctD_A += (torch.max(D_A(images), 1)[1] == zeros[0].long()).sum()
        totalD_A += hyp_miniBatchSize
        if voiceShield:
            correctD_A += (torch.max(D_A(fake_A), 1)[1] == zeros[0].long()).sum()
            totalD_A += hyp_minibatchsize
            
        # Output results
        if (i + 1) % len(testB) == 0:
            print('Epoch [%d/%d], Step[%d/%d] (Test set B): LossC_B: %.6f, LossD_A: %.6f, LossD_B: %.6f, LossG_A: %.6f, LossG_B: %.6f, '
             %(epoch + 1, num_epochs, i + 1, len(testB)//hyp_miniBatchSize, lossC_B.item(), lossD_A.item(), lossD_B.item(), lossG_A.item(), lossG_B.item()))
            print('Accuracy: AccC_B %.6f, AccD_A: %.6f, AccD_B: %.6f' %(100 * correctC_B / totalC_B, 100 * correctD_A / totalD_A, 100 * correctD_B / totalD_B))

Do the actual training

In [0]:
for epoch in range(num_epochs):
    trainEpochA(epoch)
    trainEpochB(epoch)
    testEpochA(epoch)
    testEpochB(epoch)

Save results in 'pickles' (files of model parameters)

In [0]:
os.chdir("..")

In [0]:
torch.save(G_A.state_dict(), "Models/v3/G_A.pkl")
torch.save(G_B.state_dict(), "Models/v3/G_B.pkl")
torch.save(D_A.state_dict(), "Models/v3/D_A.pkl")
torch.save(D_B.state_dict(), "Models/v3/D_B.pkl")
torch.save(C_A.state_dict(), "Models/v3/C_A.pkl")
torch.save(C_B.state_dict(), "Models/v3/C_B.pkl")