In [1]:
import os
import re
import gzip

import matplotlib.pyplot as plt
import numpy as np
from sklearn.decomposition import PCA
import pandas as pd
import librosa
import soundfile as sf
from PIL import Image
from tqdm.notebook import tqdm

## Generate PCA-MNIST

In [29]:
def mnist_gen(root_path='./raw_data/mnist', img_saving_path='./avmnist/image', labels_saving_path='./avmnist/'):
    file_names = {'train_data': 'train-images-idx3-ubyte.gz', 'train_labels': 'train-labels-idx1-ubyte.gz',
                  'test_data': 't10k-images-idx3-ubyte.gz', 'test_labels': 't10k-labels-idx1-ubyte.gz'}

    working_dir = os.getcwd()
    print("Script working directory: %s" % working_dir)

    for key, file_name in file_names.items():
        file_path = os.path.join(root_path, file_name)
        print('file: %s' % key)
        with gzip.open(file_path, 'rb') as f:
            # read the definition of idx1-ubyte and idx3-ubyte
            f.seek(4)
            num = f.read(4)
            num = int().from_bytes(num, 'big')
            print('size of %s : %d' % (key, num))
            if re.match(r'.*data.*', key) is not None:
                height = f.read(4)
                height = int().from_bytes(height, 'big')
                width = f.read(4)
                width = int().from_bytes(width, 'big')

                data = np.frombuffer(f.read(), np.uint8).reshape(num, height, width)

                # PCA projecting with 75% energy removing
                # n_comp = int(height * width)
                # pca = PCA(n_components=n_comp)
                # projected = pca.fit_transform(data.reshape(num, height * width))
                # n_comp = ((np.cumsum(pca.explained_variance_ratio_) > 0.25) != 0).argmax()
                # rec = np.matmul(projected[:, :n_comp], pca.components_[:n_comp])
                rec = data.reshape(-1, 28,28)
                print(rec.shape)
                saved_path = os.path.join(working_dir, img_saving_path)
                if not os.path.exists(saved_path):
                    os.makedirs(saved_path)
                saved_name = key + '.npy'
                np.save(os.path.join(saved_path, saved_name), rec)
            else:
                data = np.frombuffer(f.read(), np.uint8)

                saved_path = os.path.join(working_dir, labels_saving_path)
                if not os.path.exists(saved_path):
                    os.makedirs(saved_path)
                saved_name = key + '.npy'
                np.save(os.path.join(saved_path, saved_name), data)

In [30]:
mnist_gen()

Script working directory: /home/jjlee/datasets
file: train_data
size of train_data : 60000
(60000, 28, 28)
file: train_labels
size of train_labels : 60000
file: test_data
size of test_data : 10000
(10000, 28, 28)
file: test_labels
size of test_labels : 10000


In [2]:
def get_noise_names(noise_dir):
    csv = pd.read_csv(os.path.join(noise_dir, 'meta/esc50.csv'))

    # sample one recording from each category
    recordings = csv.groupby('target')['filename'].apply(lambda cat: cat.sample(1)).reset_index()['filename']
    file_names = recordings.tolist()
    file_names = [os.path.join(noise_dir, 'audio', i) for i in file_names]

    return file_names

## Generate Raw Audio files

In [27]:
def save_wav(audio_dir, file_name, noise_path, noise_power):

    audio_path = os.path.join(audio_dir, file_name)
    y, sr = librosa.load(audio_path, sr=None)
    y1, sr1 = librosa.load(noise_path, sr=None)
    sampling_rate = None

    # using the min sample rate
    if sr1 > sr:
        y1 = librosa.resample(y1, orig_sr=sr1, target_sr=sr)
        sampling_rate = sr
    else:
        y = librosa.resample(y, orig_sr=sr, target_sr=sr1)
        sampling_rate = sr1


    if len(y) < len(y1):
        samples = y + noise_power * y1[:len(y)]
    else:
        samples = y[:len(y1)] + noise_power * y1

    return samples, sampling_rate

In [None]:
audio_dir='./raw_data/FSDD/'
saving_dir='./avmnist/audio'
noise_dir='./raw_data/ESC-50/'
noise_power=0
labels_dir='./avmnist/'


wav_dir = audio_dir
file_names = [f for f in os.listdir(wav_dir) if os.path.isfile(os.path.join(wav_dir, f)) and '.wav' in f]

if len(file_names) == 0:
    print('No .wav file in %s' % wav_dir)
    exit(1)

noise_names = get_noise_names(noise_dir)

train_noise_names = noise_names[:40]
test_noise_names = noise_names[-10:]

train_category = {str(i): list() for i in range(10)}
test_category = {str(i): list() for i in range(10)}

test_wav_idx = [i for i in range(40, 50)]
for file_name in file_names:
    idx = file_name.rfind('_') + 1
    if int(file_name[idx:-4]) not in test_wav_idx:
        train_category[file_name[0]].append(file_name)
    else:
        test_category[file_name[0]].append(file_name)

train_labels = np.load(os.path.join(labels_dir, 'train_labels.npy'))
test_labels = np.load(os.path.join(labels_dir, 'test_labels.npy'))
train_names = []
train_noises = []
test_names = []
test_noises = []

idx_list = [0 for i in range(10)]
noise_idx = 0
for train_label in train_labels:
    train_names.append(train_category[str(train_label)][idx_list[train_label]])
    idx_list[train_label] += 1
    idx_list[train_label] %= 160

    train_noises.append(train_noise_names[noise_idx])
    noise_idx += 1
    noise_idx %= 40

idx_list = [0 for i in range(10)]
noise_idx = 0
for test_label in test_labels:
    test_names.append(test_category[str(test_label)][idx_list[test_label]])
    idx_list[test_label] += 1
    idx_list[test_label] %= 40

    test_noises.append(test_noise_names[noise_idx])
    noise_idx += 1
    noise_idx %= 10

names = train_names + test_names
noises = train_noises + test_noises


data = []
for i in range(len(names)):
    samples, sampling_rate = save_wav(audio_dir, names[i], noises[i], noise_power)
    data.append(samples)
    if i % 1000 == 0:
        print(samples.shape)
    file_name = os.path.join(saving_dir, f'audio_mnist_{i+1}.wav')
    sf.write(file_name, samples, sampling_rate, format='wav')



## Generate Spectrogram

In [13]:
def wav_to_spectrogram(audio_dir, file_name, noise_path, f_length, t_length, noise_power):
    """ Creates a spectrogram of a wav file.

    :param audio_dir: path of wav files
    :param file_name: file name of the wav file to process
    :param noise_path: path of noise wav file
    :return:
    """
    audio_path = os.path.join(audio_dir, file_name)
    y, sr = librosa.load(audio_path, sr=None)
    y1, sr1 = librosa.load(noise_path, sr=None)

    min_seg_length = int(np.ceil(len(y) / t_length))
    time_seg_length = min_seg_length
    noverlap = 0
    flag = False
    for i in range(min_seg_length - 1, len(y)):
        for j in range(i):
            if 113 * i - 112 * j > len(y) >= 112 * i - 111 * j:
                noverlap = j
                time_seg_length = i
                flag = True
                break
        if flag:
            break

    nfft = (f_length - 1) * 2 + 1

    if sr1 > sr:
        y1 = librosa.resample(y1, orig_sr = sr1, target_sr = sr)

    else:
        y = librosa.resample(y, orig_sr=sr, target_sr=sr1)

    if len(y) < len(y1):
        samples = y + noise_power * y1[:len(y)]
    else:
        samples = y[:len(y1)] + noise_power * y1

    fig, ax = plt.subplots(1)
    fig.subplots_adjust(left=0, right=1, bottom=0, top=1)
    ax.axis('off')
    pxx, freqs, bins, _ = ax.specgram(x=samples,
                                      NFFT=time_seg_length, pad_to=nfft, noverlap=noverlap, Fs=min(sr, sr1),
                                      cmap='Greys')
    fig.canvas.draw()
    width, height = fig.get_size_inches() * fig.get_dpi()
    mplimage = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    imarray = np.reshape(mplimage, (int(height), int(width), 3))[:,:,0] # greyscale
    plt.close(fig)

    return imarray

In [14]:
audio_dir='./raw_data/FSDD/'
saving_dir='./avmnist/spectrogram'
noise_dir='./raw_data/ESC-50/'
noise_power=0
labels_dir='./avmnist/'

working_dir = './'
f_length=112
t_length=112

wav_dir = audio_dir
file_names = [f for f in os.listdir(wav_dir) if os.path.isfile(os.path.join(wav_dir, f)) and '.wav' in f]

if len(file_names) == 0:
    print('No .wav file in %s' % wav_dir)
    exit(1)

# set the size of the spectrogarm to (112, 112)
plt.rcParams['figure.figsize'] = [1.12, 1.12]
plt.rcParams['figure.dpi'] = 100
noise_names = get_noise_names(noise_dir)

# 50 noise files
train_noise_names = noise_names[:40]
test_noise_names = noise_names[-10:]

train_category = {str(i): list() for i in range(10)}
test_category = {str(i): list() for i in range(10)}

test_wav_idx = [i for i in range(40, 50)]
for file_name in file_names:
    idx = file_name.rfind('_') + 1
    if int(file_name[idx:-4]) not in test_wav_idx:
        train_category[file_name[0]].append(file_name)
    else:
        test_category[file_name[0]].append(file_name)

train_labels = np.load(os.path.join(labels_dir, 'train_labels.npy'))
test_labels = np.load(os.path.join(labels_dir, 'test_labels.npy'))
train_names = []
train_noises = []
test_names = []
test_noises = []

idx_list = [0 for i in range(10)]
noise_idx = 0
for train_label in train_labels:
    train_names.append(train_category[str(train_label)][idx_list[train_label]])
    idx_list[train_label] += 1
    idx_list[train_label] %= 160

    train_noises.append(train_noise_names[noise_idx])
    noise_idx += 1
    noise_idx %= 40

idx_list = [0 for i in range(10)]
noise_idx = 0
for test_label in test_labels:
    test_names.append(test_category[str(test_label)][idx_list[test_label]])
    idx_list[test_label] += 1
    idx_list[test_label] %= 40

    test_noises.append(test_noise_names[noise_idx])
    noise_idx += 1
    noise_idx %= 10

names = train_names + test_names
noises = train_noises + test_noises

audio_spectrogram = []
for i in tqdm(range(len(names)), desc='Creating spectrogram', total=len(names), bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}'):
    spectrogram= wav_to_spectrogram(wav_dir, names[i], noises[i], f_length, t_length, noise_power)
    audio_spectrogram.append(spectrogram)
    spectro_img = Image.fromarray(spectrogram)
    spectro_img.save(os.path.join(saving_dir, 'spectrogram_image', f'spectrogram_{i+1}.png'))


data = np.array(audio_spectrogram)

train_data = data[:60000]
test_data = data[60000:]
np.save(os.path.join(saving_dir, 'train_data.npy'), train_data)
np.save(os.path.join(saving_dir, 'test_data.npy'), test_data)
# dir_to_spectrogram(audio_dir, saving_dir, noise_dir, labels_dir, f_length=112, t_length=112, noise_power=noise_power)

Creating spectrogram:   0%|          | 0/70000 [00:00<?, ?it/s]

  mplimage = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
