<a href="https://colab.research.google.com/github/michalis-theodosiou/talking-heads-v2/blob/main/Audio_Datasets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Clone repo and extract audio files

In [40]:
!git clone https://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@github.com/michalis-theodosiou/talking-heads-v2.git
%cd talking-heads-v2/
!pip install -q -r requirements.txt

Cloning into 'talking-heads-v2'...
remote: Enumerating objects: 16, done.[K
remote: Counting objects: 100% (16/16), done.[K
remote: Compressing objects: 100% (11/11), done.[K
remote: Total 16 (delta 1), reused 16 (delta 1), pack-reused 0[K
Unpacking objects: 100% (16/16), done.
/content/talking-heads-v2/talking-heads-v2


In [3]:
import zipfile
with zipfile.ZipFile("/content/drive/MyDrive/Colab Datasets/MEAD_AUDIO_3.zip","r") as zip_ref:
    zip_ref.extractall("/content/talking-heads-v2")

Import libraries

In [125]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import glob
from resemblyzer import preprocess_wav, VoiceEncoder
import librosa
import warnings
import random
#import concurrent.futures
from third_party.ge2e import GE2ELoss

Declare Dataset Class

In [12]:
class audio_data_single(Dataset):
    def __init__(self, directory):
        self.dir = directory
        self.filelist = glob.glob('{}/**/*.m4a'.format(self.dir),recursive=True)

    def __len__(self):
        return len(self.filelist)

    def __getitem__(self, idx):
        audio_path = self.filelist[idx]
        #resamples, normalises vol and trims silences
        audio = preprocess_wav(audio_path)

        return audio


class audio_data_ge2e(Dataset):
    """
    A class to load a batch of audio files to calculate the ge2e loss.
    Creates returns data in the format 

    Input Params
    -----------
    directory : str
      The directory where the audio files are located in the original MEAD directory structure
    intensity: int
      The intensity (from 1 - 3) from which to load audio
    num_utterances: int
      The number of utterances to sample from per emotion and speaker

    Methods
    ----------
    __get_item__(idx) : returns dict
      Returns a dictionary of lists for the idx speaker in the format {emotion_name:[list of randomly sampled utterances]}

    Todo:
    ----------
    - Add validation/training split functionality

    """
    def __init__(self, directory, intensity, num_utterances):
        self.intensity_level = 'level_' + str(intensity)
        self.dir = directory
        self.filelist = glob.glob('{}/**/{}/*.m4a'.format(self.dir,self.intensity_level),recursive=True)
        self.emotions = sorted(list(set(path.split('/')[3] for path in self.filelist)))
        self.speakers = sorted(list(set(path.split('/')[1] for path in self.filelist)))
        self.utterances = sorted(list(set(path.split('/')[5].split('.')[0] for path in self.filelist)))
        self.num_utterances = num_utterances

    def __len__(self):
        return len(self.speakers)

    def __getitem__(self, idx):
        # selects one speaker and takes 16 random utterances for each emotion

        output_dict = {}
        speaker = self.speakers[idx]

        with warnings.catch_warnings():
          warnings.simplefilter("ignore")
          for emotion in self.emotions:
              all_files = glob.glob(f'{self.dir}/{speaker}/audio/{emotion}/{self.intensity_level}/*.m4a')
              chosen_files = random.sample(all_files,self.num_utterances)
          #resamples, normalises vol and trims silences
              # with concurrent.futures.ProcessPoolExecutor() as executor:
              #   result = executor.map(preprocess_wav,chosen_files)
              #   output_dict[emotion] = result

              output_dict[emotion] = []
              for f in chosen_files:
                output = preprocess_wav(f)
                output_dict[emotion].append(output)

          return output_dict

train_dataset = audio_data_ge2e('MEAD_AUDIO_3',intensity=3,num_utterances=16)
dataloader = DataLoader(train_dataset,batch_size=1,shuffle=True)

Todo list:

- write training pass function for rezemblyser to output processed 

In [38]:
t = ge2e_test_extraction(dataset,0)

Loaded the voice encoder model on cpu in 0.01 seconds.
loading data...
data loaded, extracting embeddings...


In [37]:
def ge2e_test_extraction(dataset,idx):
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  resemblyzer_encoder = VoiceEncoder(device=device)

  print('loading data...')
  data = dataset[idx]

  print('data loaded, extracting embeddings...')
  N = len(dataset.emotions)
  M = len(dataset.utterances)
  D = 256
  output = np.empty([N,M,D],dtype=np.float32)

  for n,emotion in enumerate(dataset.emotions):
    for m,audio in enumerate(data[emotion]):
      output[n,m,:] = resemblyzer_encoder.embed_utterance(audio)
  
  return output

In [49]:

loss_fn = GE2ELoss(init_w=10.0, init_b=-5.0, loss_method='contrast')
loss = loss_fn(torch.from_numpy(t))

In [112]:
class VoiceEncoder_train(VoiceEncoder):

  """parent class of resemblyzer voice encoder to add embedding function with gradient"""

  def __init__(self):
    super().__init__(device)

  def embed_utterance_train(self,wav,rate=1.3, min_coverage=0.75):
    from resemblyzer import audio
    wav_slices, mel_slices = self.compute_partial_slices(len(wav), rate, min_coverage)
    max_wave_length = wav_slices[-1].stop
    if max_wave_length >= len(wav):
        wav = np.pad(wav, (0, max_wave_length - len(wav)), "constant")
    
    # Split the utterance into partials
    mel = audio.wav_to_mel_spectrogram(wav)
    mels = np.array([mel[s] for s in mel_slices])
    mels = torch.from_numpy(mels).to(self.device)
    # forward through the network
    partial_embeds = self(mels)

    # Compute the utterance embedding from the partial embeddings
    raw_embed = partial_embeds.mean(axis=0)
    embed = torch.nn.functional.normalize(raw_embed,p=2,dim=0)

    return embed

In [159]:
def ge2e_forward(data):
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  encoder = VoiceEncoder_train()

  N = len(data.keys())
  M = len(data[list(data.keys())[0]])
  D = 256
  output = torch.empty([N,M,D])

  for n,emotion in enumerate(dataset.emotions):
    for m,audio in enumerate(data[emotion]):
      output[n,m,:] = encoder.embed_utterance_train(audio)
  
  return output

In [160]:
ge2e_forward(data)

Loaded the voice encoder model on cpu in 0.01 seconds.
data loaded, extracting embeddings...


tensor([[[0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0154, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         ...,
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         [0.1709, 0.0000, 0.0049,  ..., 0.0000, 0.0271, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.0119, 0.0000, 0.0000,  ..., 0.0000, 0.0061, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0028, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         ...,
         [0.0000, 0.0414, 0.0000,  ..., 0.0000, 0.0000, 0.0034],
         [0.0097, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0159],
         [0.0000, 0.0018, 0.0000,  ..., 0.0000, 0.0027, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.

In [115]:
tt = VoiceEncoder_train()

Loaded the voice encoder model on cpu in 0.02 seconds.


Training pass of the model

In [None]:
# define optimizer
# define criterion
model = VoiceEncoder_train()
criterion = GE2ELoss(init_w=10.0, init_b=-5.0, loss_method='contrast')
optimizer = torch.optim.Adam(list(model.parameters()) + list(criterion.parameters()), lr=0.0001)
train_dataset = audio_data_ge2e('MEAD_AUDIO_3',intensity=3,num_utterances=16)

for epoch in range(1):  # loop over the dataset multiple times

    running_loss = 0.0

    #shuffle indices
    indices = list(range(len(train_dataset)))
    random.shuffle(indices)

    for i, idx in enumerate(indices): #enumerate over indices
        # get the data
        data = train_dataset[idx]

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = ge2e_forward(data)
        loss = criterion(outputs)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        print(f'epoch {epoch}, batch {i}, loss = {loss.item()}')

print('Finished Training')

Loaded the voice encoder model on cpu in 0.02 seconds.
Loaded the voice encoder model on cpu in 0.02 seconds.
data loaded, extracting embeddings...
epoch 0, batch 0, loss = 111.4823226928711
Loaded the voice encoder model on cpu in 0.01 seconds.
data loaded, extracting embeddings...


In [138]:
indices = list(range(len(train_dataset)))
random.shuffle(indices)

In [139]:
indices

[47,
 25,
 1,
 27,
 14,
 22,
 4,
 29,
 40,
 32,
 18,
 11,
 26,
 12,
 43,
 42,
 23,
 35,
 19,
 7,
 5,
 17,
 45,
 34,
 10,
 3,
 13,
 37,
 41,
 36,
 38,
 39,
 31,
 0,
 33,
 24,
 9,
 6,
 8,
 30,
 46,
 15,
 20,
 21,
 44,
 2,
 16,
 28]

In [134]:
len(dataset)

48