# Initialization

In [None]:
# Whether to backup the
DRIVE_BACKUP = True

if DRIVE_BACKUP:
    from google.colab import drive
    drive.mount("/content/drive")

In [None]:
# Standard imports
# !pip uninstall -y tensorflow
# !pip install tensorflow-gpu
# !pip install --no-deps tensorflow-io
!pip install tensorflow-io
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_hub as hub
import tensorflow_io as tfio

# from google.colab import drive, files
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import requests
import scipy
import time
# import tqdm
import tqdm.notebook as tqdm
import zipfile

In [None]:
# Constants
DATASIZE = 132299 # Number of points for each sample (=3s)
SAMPLERATE = 44100
INSTRUMENTS_MAP = {
    "cel": "cello",
    "cla": "clarinet",
    "flu": "flute",
    "gac": "acoustic guitar",
    "gel": "electric guitar",
    "org": "organ",
    "pia": "piano",
    "sax": "saxophone",
    "tru": "trumpet",
    "vio": "violin",
    "voi": "human voice",
}
NB_SAMPLES = 100 # Maximal number of samples for each instrument

# Download and convert

In [None]:
# Datasets download and extraction
DOWNLOAD_LINKS = [
    "https://zenodo.org/records/1290750/files/IRMAS-TrainingData.zip?download=1",
    "https://zenodo.org/records/1290750/files/IRMAS-TestingData-Part1.zip?download=1",
    "https://zenodo.org/records/1290750/files/IRMAS-TestingData-Part2.zip?download=1",
    "https://zenodo.org/records/1290750/files/IRMAS-TestingData-Part3.zip?download=1",
]

def download_with_progress(url: str, filepath: str, block_size: int = 1024) -> None:
    r = requests.get(url, stream=True)
    size = int(r.headers.get("content-length", 0))
    dirpath, filename = os.path.split(filepath)
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)
    with tqdm.tqdm(desc=filename, total=size, unit="iB", unit_scale=True) as bar:
        with open(filepath, "bw") as f:
            for data in r.iter_content(block_size):
                bar.update(len(data))
                f.write(data)

for link in DOWNLOAD_LINKS:
    filename = link[:link.index("?")].split("/")[-1]
    filepath = os.path.join(os.getcwd(), "zips", filename)
    download_with_progress(link, filepath)
    with zipfile.ZipFile(filepath, "r") as f:
        f.extractall(os.path.join(os.getcwd(), "extracted"))
os.system("rm -rf zips")

In [None]:
# Tools
def audio_to_spec(audiopath: str, specpath: str) -> None:
    # samplerate, stereo = scipy.io.wavfile.read(audiopath)
    # data = stereo.sum(axis=1)[:DATASIZE] / data.shape[1]
    audio = tfio.audio.AudioIOTensor(audiopath)
    samplerate = audio.rate
    data = tf.cast(audio.to_tensor()[:,0], tf.float32) / 32768.0

    if samplerate != SAMPLERATE:
        raise NotImplementedError("Samplerate conversion")
    # if len(data) != DATASIZE:
    #     raise ValueError(f"Invalid data size (got {len(data)}, expected {DATASIZE})")

    spectrogram = tfio.audio.spectrogram(data, nfft=512, window=512, stride=256)

    display = tf.math.log(tf.transpose(spectrogram)[::-1, :]).numpy()

    plt.clf()
    plt.imshow(display, interpolation="none", extent=[0, display.shape[1], 0, display.shape[0]])
    plt.axis('off')
    plt.savefig(specpath, bbox_inches='tight', pad_inches=0)

In [None]:
# Pre-process IRMAS

def mutiple_training_audios_to_specs(origin: str, destination: str):
    instruments = [path for path in os.listdir(origin) if os.path.isdir(os.path.join(origin, path))]
    instrument_bar = tqdm.tqdm(instruments, total=len(instruments), position=0)
    for instrument in instrument_bar:
        instrument_bar.set_description(INSTRUMENTS_MAP.get(instrument, "unknown"))

        origin_dir = os.path.join(origin, instrument)
        dest_dir = os.path.join(destination, instrument)
        if not os.path.isdir(origin_dir):
            continue
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for filename in tqdm.tqdm(os.listdir(origin_dir), position=1, leave=False):
            if filename.endswith(".txt") or filename == ".DS_Store":
                continue
            audiopath = os.path.join(origin_dir, filename)
            name, _ = os.path.splitext(filename)
            specpath = os.path.join(dest_dir, name+".png")
            audio_to_spec(audiopath, specpath)
    instrument_bar.close()

def mutiple_testing_audios_to_specs(origin: str, destination: str):
    origin_dir = os.path.join(origin, os.listdir(origin)[0])

    for filename in tqdm.tqdm(os.listdir(origin_dir), position=1, leave=False):
        if filename.endswith(".txt") or filename == ".DS_Store":
            continue
        audiopath = os.path.join(origin_dir, filename)
        name, _ = os.path.splitext(filename)
        textpath = os.path.join(origin_dir, name+".txt")
        with open(textpath, "r") as f:
            instruments = f.read().splitlines()
        specpath = os.path.join("-".join(instruments) + "#" + destination, name+".png")
        audio_to_spec(audiopath, specpath)

plt.figure()

# Extract training data
for name in ["IRMAS-TrainingData"]:
    start = time.time()
    origin = os.path.join(os.getcwd(), "extracted", name)
    destination = os.path.join(os.getcwd(), "dataset", "training")
    mutiple_training_audios_to_specs(origin, destination)
    print(name, round((time.time() - start) / 60, 3), "min")

# Extract testing data
for name in [
    "IRMAS-TestingData-Part1",
    "IRMAS-TestingData-Part2",
    "IRMAS-TestingData-Part3",
]:
    start = time.time()
    origin = os.path.join(os.getcwd(), "extracted", name)
    destination = os.path.join(os.getcwd(), "dataset", "testing")
    mutiple_testing_audios_to_specs(origin, destination)
    print(name, round((time.time() - start) / 60, 3), "min")

os.system("rm -rf extracted")

In [None]:
directory = os.path.join(os.getcwd(), "dataset")
size = sum(os.path.getsize(os.path.join(dirpath, f)) for dirpath, _, filenames in os.walk(directory) for f in filenames if not os.path.islink(os.path.join(dirpath, f)))
nb = len([f for dirpath, _, filenames in os.walk(directory) for f in filenames if not os.path.islink(os.path.join(dirpath, f))])
for format in ["B", "kB", "MB", "GB", "TB"]:
    if size < 1024:
        break
    size /= 1024
print(f"{size:.3f}{format} ({nb} files)")

In [None]:
# Backup files
if DRIVE_BACKUP:
    drive.mount("/content/drive")
    !cp -R /content/dataset /content/drive/MyDrive/Colab\ Notebooks/Moonshot/dataset

# Training

In [None]:
# Load backup
if DRIVE_BACKUP and (not os.path.exists("/content/dataset") or not os.listdir("/content/dataset")):
    drive.mount("/content/drive")
    !cp -R /content/drive/MyDrive/Colab\ Notebooks/Moonshot/dataset /content/dataset

# Testing Zone

In [None]:
path = r"/content/extracted/IRMAS-TrainingData/pia/001__[pia][nod][cla]1389__1.wav"



t = time.perf_counter_ns()
for i in range(100):
    samplerate, data = scipy.io.wavfile.read(path)
    generate_spectrogram(data, samplerate, "test.png")
dt1 = time.perf_counter_ns() - t



t = time.perf_counter_ns()
for i in range(100):
    audio = tfio.audio.AudioIOTensor(path)
    mono = tf.cast(audio.to_tensor()[:,0], tf.float32) / 32768.0
    spectrogram = tfio.audio.spectrogram(mono, nfft=512, window=512, stride=256)
    display = tf.math.log(tf.transpose(spectrogram)[::-1, :]).numpy()
    plt.close('all')
    plt.figure()
    plt.imshow(display, interpolation="none")#, extent=[0, display.shape[1], 0, display.shape[0]])
    plt.axis('off')
    plt.subplots_adjust(left=0, right=1, top=1, bottom=0)
    plt.savefig("test2.png", bbox_inches='tight', pad_inches=0)
dt2 = time.perf_counter_ns() - t



print("Speed up ratio:")
print(dt1 / dt2)