In [None]:
import numpy as np  # linear algebra
import pandas as pd  # data processing, CSV file I/O (e.g. pd.read_csv)
from pathlib import Path
import matplotlib.pyplot as plt
from tqdm import tqdm
import PIL
import pickle
import os
import argparse
import librosa
import librosa.display
import random
from zipfile import ZipFile 

In [None]:
path = Path("/kaggle/input/freesound-audio-tagging-2019")
data_dir = Path('data')
(data_dir/'test').mkdir(parents=True, exist_ok=True)
(data_dir/'train_curated').mkdir(parents=True, exist_ok=True)

In [None]:
%%time
# loading the temp.zip and creating a zip object 
with ZipFile(path/'test.zip', 'r') as zObject: 
    zObject.extractall(path=data_dir/'test') 
zObject.close() 

In [None]:
%%time
# loading the temp.zip and creating a zip object 
with ZipFile(path/'train_curated.zip', 'r') as zObject: 
    zObject.extractall(path=data_dir/'train_curated') 
zObject.close() 

In [None]:
class args:
    train_df_path = path/'train_curated.csv'
    test_dir = data_dir/'test'
    train_dir = data_dir/'train_curated'
    train_output_path = data_dir/'mels_train.pkl'
    test_output_path = data_dir/'mels_test.pkl'

In [None]:
train_df = pd.read_csv(args.train_df_path)
test_fns = sorted(os.listdir(args.test_dir))
test_df = pd.DataFrame()
test_df["fname"] = test_fns

In [None]:
def read_audio(conf, pathname, trim_long_data):
    y, sr = librosa.load(pathname, sr=conf.sampling_rate)
    # trim silence
    if 0 < len(y):  # workaround: 0 length causes error
        y, _ = librosa.effects.trim(y)  # trim, top_db=default(60)
    else:
        print(f"found zero length audio {pathname}")
        y = np.zeros((conf.samples,), np.float32)
    # make it unified length to conf.samples
    if len(y) > conf.samples:  # long enough
        if trim_long_data:
            y = y[0:0 + conf.samples]
    else:  # pad blank
        leny = len(y)
        padding = conf.samples - len(y)  # add padding at both ends
        offset = padding // 2
        y = np.pad(y, (offset, conf.samples - len(y) - offset), conf.padmode)
    return y


def audio_to_melspectrogram(conf, audio):
    spectrogram = librosa.feature.melspectrogram(
        y=audio, sr=conf.sampling_rate, n_mels=conf.n_mels, hop_length=conf.hop_length, n_fft=conf.n_fft, fmin=conf.fmin, fmax=conf.fmax)
    spectrogram = librosa.power_to_db(spectrogram)
    spectrogram = spectrogram.astype(np.float32)
    return spectrogram


def show_melspectrogram(conf, mels, title='Log-frequency power spectrogram'):
    librosa.display.specshow(mels, x_axis='time', y_axis='mel',
                             sr=conf.sampling_rate, hop_length=conf.hop_length,
                             fmin=conf.fmin, fmax=conf.fmax)
    plt.colorbar(format='%+2.0f dB')
    plt.title(title)
    plt.show()


def read_as_melspectrogram(conf, pathname, trim_long_data):
    x = read_audio(conf, pathname, trim_long_data)
    mels = audio_to_melspectrogram(conf, x)
    return mels


class conf:
    sampling_rate = 16000
    duration = 2  # sec
    hop_length = 125 * duration  # to make time steps 128
    fmin = 20
    fmax = sampling_rate // 2
    n_mels = 128
    n_fft = n_mels * 20
    padmode = 'constant'
    samples = sampling_rate * duration


def get_default_conf():
    return conf


def set_fastai_random_seed(seed=42):
    # https://docs.fast.ai/dev/test.html#getting-reproducible-results
    # python RNG
    random.seed(seed)
    # pytorch RNGs
    import torch
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    # numpy RNG
    import numpy as np
    np.random.seed(seed)


def mono_to_color(X, mean=None, std=None, norm_max=None, norm_min=None, eps=1e-6):
    # Stack X as [X,X,X]
    X = np.stack([X, X, X], axis=-1)

    # Standardize
    mean = mean or X.mean()
    X = X - mean
    std = std or X.std()
    Xstd = X / (std + eps)
    _min, _max = Xstd.min(), Xstd.max()
    norm_max = norm_max or _max
    norm_min = norm_min or _min
    if (_max - _min) > eps:
        # Normalize to [0, 255]
        V = Xstd
        V[V < norm_min] = norm_min
        V[V > norm_max] = norm_max
        V = 255 * (V - norm_min) / (norm_max - norm_min)
        V = V.astype(np.uint8)
    else:
        # Just zero
        V = np.zeros_like(Xstd, dtype=np.uint8)
    return V


def convert_wav_to_image(df, source):
    X = []
    for i, row in tqdm(df.iterrows(), total=len(df)):
        x = read_as_melspectrogram(
            conf, source / str(row.fname), trim_long_data=False)
        x_color = mono_to_color(x)
        X.append(x_color)
    return X


def save_as_pkl_binary(obj, filename):
    """Save object as pickle binary file.
    Thanks to https://stackoverflow.com/questions/19201290/how-to-save-a-dictionary-to-a-file/32216025
    """
    with open(filename, 'wb') as f:
        pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)


def load_pkl(filename):
    """Load pickle object from file."""
    with open(filename, 'rb') as f:
        return pickle.load(f)


conf = get_default_conf()


def convert_dataset(df, source_folder, filename):
    X = convert_wav_to_image(df, source=source_folder)
    save_as_pkl_binary(X, filename)
    print(f'Created {filename}')
    return X

In [None]:
convert_dataset(train_df, Path(args.train_dir), args.train_output_path)
convert_dataset(test_df, Path(args.test_dir), args.test_output_path)

In [None]:
!rm -rf "data/train_curated"

In [None]:
!rm -rf "data/test"