## Packages

In [1]:
import os
import os.path as osp
from pathlib import Path
import math
from time import time
import matplotlib.pyplot as plt

import numpy as np
import pandas as pd
import librosa

from IPython.display import Audio, clear_output, display

## Arguments

In [2]:
wavs_dir = "../wavs2/"
wavs = [path for path in Path(wavs_dir).rglob("*.wav")]

output_dir = "../outputs/npy2/"
transcript_path = "../outputs/data_transcripts_v2.csv"
transcripts = pd.read_csv(transcript_path)

target_sr = 16000
target_channel = 1  # 1 or 2
pitch_range = ("C2", "C5")

# Re-process the start and end index as the files will be resampled
transcripts["start_idx"] = np.floor(transcripts["start"] * target_sr).astype(int)
transcripts["end_idx"] = np.ceil(transcripts["end"] * target_sr).astype(int)

if not osp.exists(output_dir):
    os.makedirs(output_dir)

## User Defined Functions

In [3]:
def resample(audio, current_sr, target_sr):
    if current_sr == target_sr:
        return audio
    else:
        new_audio = librosa.resample(audio, current_sr, target_sr)
        return new_audio


def rechannel(audio, target_channel):
    current_shape = audio.shape
    if target_channel == 1:
        if len(current_shape) == 1:
            return audio
        else:
            new_audio = librosa.to_mono(audio)
            return new_audio
    else:
        if len(current_shape) == 2:
            return audio
        else:
            new_audio = np.column_stack([audio, audio])
            return new_audio


def extract_pitch(audio, pitch_range=("C2", "C5"), impute_val=0.0, log=True):
    f0, voiced_flags, voiced_probs = librosa.pyin(
        y=audio,
        fmin=librosa.note_to_hz(pitch_range[0]),
        fmax=librosa.note_to_hz(pitch_range[1]),
    )
    if log:
        f0 = np.log2(f0)
    f0_inputed = np.nan_to_num(f0, nan=impute_val)
    return (f0_inputed, voiced_flags, voiced_probs)


def extract_onset(audio, sampling_rate, max_size=5):
    onset_strengths = librosa.onset.onset_strength(
        y=audio, sr=sampling_rate, max_size=max_size
    )
    onset_frames = librosa.onset.onset_detect(y=audio, sr=sampling_rate, units="frames")
    onset_flags = np.zeros(onset_strengths.shape[0])
    onset_flags[onset_frames] = 1
    return (onset_strengths, onset_flags)

## Extract Features (One File Sample)

In [44]:
# File Path
file_path = wavs[0]

# Load in audio
audio_array, audio_sr = librosa.load(
    file_path, sr=librosa.core.get_samplerate(file_path)
)

# Pre-process (resample, rechannel)
audio_array = resample(audio_array, audio_sr, target_sr)
audio_array = rechannel(audio_array, target_channel)

In [57]:
file_transcripts = transcripts.loc[transcripts["file"] == file_path.name]

print(file_path.name)
print(f"{len(file_transcripts)} transcript lines")

# Iterate over transcript metadata
# Segment clip by start and end indices

shapes = []
st = time()
for x in file_transcripts.iterrows():
    row = x[1]
    clear_output(wait=True)
    print(
        "Item:",
        x[0],
        " |  Line:",
        row["line"],
        " |  Progress:",
        f"{round(100*(x[0]/file_transcripts.shape[0]))}%",
    )
    print(f"Elapsed Time: {round(time() - st, 2)}s")

    segment = audio_array[row["start_idx"] : row["end_idx"]]
    f0_inputed, voiced_flags, voiced_probs = extract_pitch(
        segment, target_sr, pitch_range, log=True
    )
    onset_strengths, onset_flags = extract_onset(segment, target_sr)
    full_array = np.column_stack(
        (f0_inputed, voiced_flags, voiced_probs, onset_strengths, onset_flags)
    )
    np.save(osp.join(output_dir, f"{file_path.name}_{row['line']}.npy"), full_array)

    shapes.append(full_array.shape)

# For record keeping, used to decide max_lens
np.save(
    osp.join(output_dir, f"{file_path.name}_shapes.npy"),
    np.array([s[0] for s in shapes]),
)
print(f"Total Time: {round(time() - st, 2)}s")

Item: 36  |  Line: 142  |  Progress: 97%
Elapsed Time: 79.36s
Total Time: 82.48s


## All Files

In [4]:
files_list = list(transcripts["file"].unique())
length = len(files_list)
chunks = 8

split_size = math.ceil(length / chunks)

[
    files_list[max(0, i * split_size) : min(length, (i + 1) * split_size)]
    for i in range(chunks)
]


for i, w in enumerate(wavs):
    # Load in audio
    audio_array, audio_sr = librosa.load(w, sr=librosa.core.get_samplerate(w))

    # Pre-process (resample, rechannel)
    audio_array = resample(audio_array, audio_sr, target_sr)
    audio_array = rechannel(audio_array, target_channel)

    file_transcripts = transcripts.loc[
        transcripts["file"] == w.name.replace(".wav", "")
    ].reset_index(drop=True)

    print(w.name)
    print(f"{len(file_transcripts)} transcript lines")

    # Iterate over transcript metadata
    # Segment clip by start and end indices

    shapes = []
    st = time()
    for x in file_transcripts.iterrows():
        row = x[1]
        clear_output(wait=True)
        print(w.name)
        print(
            "Item:",
            x[0],
            " |  Line:",
            row["line"],
            " |  Progress:",
            f"{round(100*(x[0]/file_transcripts.shape[0]))}%",
        )
        print(f"Elapsed Time: {round(time() - st, 2)}s")

        try:

            segment = audio_array[row["start_idx"] : row["end_idx"]]
            f0_inputed, voiced_flags, voiced_probs = extract_pitch(
                segment, pitch_range, log=True
            )
            onset_strengths, onset_flags = extract_onset(segment, target_sr)
            full_array = np.column_stack(
                (f0_inputed, voiced_flags, voiced_probs, onset_strengths, onset_flags)
            )
            np.save(
                osp.join(output_dir, f"{row['file']}_{row['line']}.npy"), full_array
            )

            shapes.append(full_array.shape)
        except:
            pass

    # For record keeping, used to decide max_lens
    np.save(
        osp.join(output_dir, f"{w.name.replace('.wav','')}_shapes.npy"),
        np.array([s[0] for s in shapes]),
    )

print(f"Total Time: {round(time() - st, 2)}s")

11-681.wav
Item: 0  |  Line: 4  |  Progress: 0%
Elapsed Time: 0.0s
Total Time: 1.09s




In [17]:
len(
    [
        files_list[max(0, i * split_size) : min(length, (i + 1) * split_size)]
        for i in range(chunks)
    ][2]
)

61