In [1]:
from datasets import load_dataset, concatenate_datasets

cmu = load_dataset('CLAPv2/CMU_Arctic')
cmu = concatenate_datasets([*cmu.values()])
display(cmu)
print(f"\n{'-' * 50}\n")
display(cmu[0])

Resolving data files:   0%|          | 0/119 [00:00<?, ?it/s]

Dataset({
    features: ['index', 'datasetname', 'audio', 'audio_len', 'text', 'raw_text'],
    num_rows: 13192
})


--------------------------------------------------



{'index': './train/10005',
 'datasetname': 'FSD50K',
 'audio': <datasets.features._torchcodec.AudioDecoder at 0x7fe4396a7620>,
 'audio_len': 3.17006254196167,
 'text': 'A man reads out "He was fond of quoting a fragment from a certain poem." in the Scottish accent',
 'raw_text': ['Title: CMU_Arctic',
  'Description: The databases consist of around 1150 utterances carefully selected from out-of-copyright texts from Project Gutenberg. The databses include US English male (bdl) and female (slt) speakers (both experinced voice talent) as well as other accented speakers.',
  'License: BSD',
  'Text: He was fond of quoting a fragment from a certain poem.',
  'Accent: Scottish',
  'Gender: male',
  'Filename: cmu_sc_male/arctic_a0520.wav']}

In [2]:
import re

def get_metadata(record):
    info     = '\n'.join(record['raw_text'])
    patterns = { 
        'speaker'    : r'Filename:\s*([^/]+)',
        'accent'     : r'Accent:\s*([^\n]+)', 
        'sex'        : r'Gender:\s*([^\n]+)',
        'spoken_text': r'Text:\s*([^\n]+)',
    }
    extract_pattern = lambda pattern: re.search(pattern, info).group(1).strip()
    metadata = { 'time_seconds': record['audio_len'] }
    metadata.update({ 
        key: extract_pattern(patterns[key]) for key in patterns
    })
    return metadata

def get_audio(record):
    audio = record['audio'].get_all_samples()
    samples = audio.data.mean(0)
    samplerate = audio.sample_rate
    return samples, samplerate

def get_audio_attrs(record):
    metadata            = get_metadata(record)
    dataset, samplerate = get_audio(record)
    return metadata, dataset, samplerate

In [3]:
metadata, dataset, samplerates = zip(*map(get_audio_attrs, cmu))

In [4]:
from typing import final

unique_samplerates = list(set(samplerates))
assert len(unique_samplerates) == 1, "When executing this check all the audios should've been already subsampled/supersampled to the same sampling rate."

SAMPLERATE: final = unique_samplerates[0]

In [5]:
import torch
import torchaudio

FRAME_MS, HOP_MS = 15, 5

Spectrogram = torchaudio.transforms.Spectrogram(
    n_fft      = FRAME_MS * SAMPLERATE // 1000,
    win_length = FRAME_MS * SAMPLERATE // 1000,
    hop_length =   HOP_MS * SAMPLERATE // 1000,
    power      = None,
    window_fn  = torch.hann_window,
)

InvSpectrogram = torchaudio.transforms.InverseSpectrogram(
    n_fft      = FRAME_MS * SAMPLERATE // 1000,
    win_length = FRAME_MS * SAMPLERATE // 1000,
    hop_length =   HOP_MS * SAMPLERATE // 1000,
    window_fn  = torch.hann_window,
)

def compute_PdB_t(A_ft):
    assert 1 < A_ft.ndim < 4, '`A_ft` MUST be 2D in case it does not have a batch dimension.'
    f_axis = 0 if A_ft.ndim < 3 else 1
    return 10 * torch.log10(torch.square(A_ft).sum(f_axis))

def apply_vad(samples):
    global Spectrogram, InvSpectrogram
    X_ft         = Spectrogram(samples)
    A_ft, phi_ft = map(lambda func: func(X_ft), [torch.abs, torch.angle])
    PdB_t        = compute_PdB_t(A_ft)
    f_bins       = A_ft.shape[0]
    vad_ft       = (PdB_t >= 10).unsqueeze(0).expand(f_bins, -1)
    A_ft, phi_ft = map(lambda M_ft: M_ft[vad_ft].reshape(f_bins, -1), [A_ft, phi_ft])
    X_ft         = A_ft * torch.exp(1j * phi_ft)
    samples      = InvSpectrogram(X_ft)
    return samples

In [6]:
dataset = tuple(map(apply_vad, dataset))
MAX_SAMPLES: final = max(map(len, dataset))

In [7]:
from collections import defaultdict
import re

key_components = ['accent', 'spoken_text']

audio_groups = defaultdict(list)
for idx, audio in enumerate(metadata):
    key = tuple(audio[x] for x in key_components)
    audio_groups[key].append(idx)

accents, spoken_texts = map(set, zip(*audio_groups.keys()))

print(f'''
Original Recordings:

Total Number of Recordings:      {len(cmu)}
Number of Accents:               {len(accents)}
Number of Spoken Texts:          {len(spoken_texts)}
Number of Accented Spoken Texts: {len(audio_groups)}
''')


Original Recordings:

Total Number of Recordings:      13192
Number of Accents:               5
Number of Spoken Texts:          1248
Number of Accented Spoken Texts: 5181



In [8]:
pairs = []
for key in audio_groups:
    audio_group = audio_groups[key]
    n = len(audio_group)
    if n < 2:
        continue
    for i in range(n-1):
        a = audio_group[i]
        for j in range(i+1, n):
            b = audio_group[j]
            t1, t2 = map(lambda idx: metadata[idx]['time_seconds'], [a, b])
            dt = 2 * abs(t1 - t2) / (t1 + t2)
            similar_tempos = dt < 0.01
            having_different_speakers = metadata[a]['speaker'] != metadata[b]['speaker']
            if similar_tempos and having_different_speakers:
                pairs.append([a, b])

print(f'''
Different-Speaker, Similar-Tempo, and Same-Accented-Text Recording Pairs:

Total Number of Pairs [Datapoints]: {len(pairs)}
''')


Different-Speaker, Similar-Tempo, and Same-Accented-Text Recording Pairs:

Total Number of Pairs [Datapoints]: 719



In [9]:
from IPython.display import Audio, display
from random import randint

def display_pair(dataset, i, j):
    audios = map(lambda idx: Audio(dataset[idx], rate=SAMPLERATE), [i, j])
    for audio in audios:
        display(audio)

def choose_random_pair(pairs):
    return pairs[randint(0, len(pairs))]

def display_random_pair():
    global dataset, pairs
    pair = choose_random_pair(pairs)
    print(f'\tPair: {pair}\n')
    display_pair(dataset, *pair)
    return pair

i, j = display_random_pair()

	Pair: [1654, 5660]



In [10]:
import torch
import torch.nn.functional as F

pad = lambda samples: F.pad(samples, pad=(0, MAX_SAMPLES - len(samples)), value=samples.abs().min())
X_t = torch.stack([pad(dataset[idx]) for idx in (i, j)])
original_samples = max(dataset[idx].numel() for idx in (i, j))
X_t.shape

torch.Size([2, 264240])

In [11]:
X_ft = Spectrogram(X_t)
A_ft, phi_ft = map(lambda func: func(X_ft), [torch.abs, torch.angle])
X_ft = A_ft * torch.exp(1j * phi_ft)
X_t  = InvSpectrogram(X_ft)
display(Audio(X_t[0], rate=SAMPLERATE))
display(Audio(X_t[1], rate=SAMPLERATE))

In [12]:
A_ft.shape

torch.Size([2, 361, 1102])

In [13]:
compute_PdB_ft(A_ft).shape

NameError: name 'compute_PdB_ft' is not defined

In [None]:
import matplotlib.pyplot as plt

PdBs = compute_PdB_t(A_ft)
t_bins = range(PdBs.shape[-1])

_, axs = plt.subplots(2, 1, figsize=(10, 5))
for ax, PdB in zip(axs, PdBs):
    ax.plot(t_bins, PdB)
    ax.set_xlim(0, 700)
    ax.set_ylim(-3)

In [None]:
images = A_ft.detach().clone().numpy()[:, :150, :670]
_, axs = plt.subplots(2, 1, figsize=(10, 5))
axs[0].imshow(images[0], aspect='auto', origin='lower')
axs[1].imshow(images[1], aspect='auto', origin='lower')

In [None]:
PdB_ft

In [None]:
a, b = images

In [None]:
A_ft.shape

In [None]:
plt.imshow(images[0], origin='lower')