In [1]:
import os, json, glob
import librosa, mir_eval, guitarpro, scipy
import numpy as np
import joblib

In [2]:
SINGLE_TRACK_GTP_DIR = "/Volumes/MacOnly/UG_rewrite/all_time_top_by_rating/clean_single_track_gtps"
SINGLE_TRACK_ANNO_DIR = "/Volumes/MacOnly/UG_rewrite/all_time_top_by_rating/clean_single_track_annos"
SINGLE_TRACK_AUDIO_DIR = "/Volumes/MacOnly/UG_rewrite/all_time_top_by_rating/clean_single_track_audio"

In [4]:
def extract_features(y, sr):
    mfcc = librosa.feature.mfcc(y, sr=sr)
    pitch, _, _ = librosa.pyin(y, fmin=librosa.note_to_hz("C2"), fmax=librosa.note_to_hz("G6"), sr=sr, fill_na=None)
    centroid = np.squeeze(librosa.feature.spectral_centroid(y, sr))
    bandwidth = np.squeeze(librosa.feature.spectral_bandwidth(y, sr))
    flatness = np.squeeze(librosa.feature.spectral_flatness(y))
    rolloff = np.squeeze(librosa.feature.spectral_rolloff(y, sr))
    zero_crossing = np.squeeze(librosa.feature.zero_crossing_rate(y))
    flux = librosa.onset.onset_strength(y, sr)

    non_mfccs = np.array([pitch, centroid, bandwidth, flatness, rolloff, zero_crossing, flux])
    features = np.concatenate((mfcc, non_mfccs), axis=0)

    features_delta = librosa.feature.delta(features, order=1)
    features_accel = librosa.feature.delta(features, order=2)

    all_features = np.concatenate((features, features_delta, features_accel), axis=0) # (81, 13127)
    assert all_features.shape[0] == 81
    return all_features

def get_all_stats(a):
    """Given a 2D matrix, compute and concatenate the 6 statistics.

    Args:
        a (array): The input time series, of the shape (n,)

    Returns:
        array: An array containing the statistics, of the shape (6,)
    """
    assert a.ndim == 2
    mean = np.mean(a, axis=1)
    std = np.std(a, axis=1)
    max = np.max(a, axis=1)
    min = np.min(a, axis=1)
    skewness = scipy.stats.skew(a, axis=1, nan_policy="raise")
    kurtosis = scipy.stats.kurtosis(a, axis=1, nan_policy="raise")

    stats = np.concatenate((mean, std, max, min, skewness, kurtosis), axis=0)
    assert stats.shape[0] == a.shape[0] * 6
    return stats

In [5]:
def f0_note_tracker(y, sr, pitch_diff_thres=0.8, note_dur_thres=75):
    """Note tracking based on F0 segmentation

    This function takes an audio signal as input and use PYIN to estimate an f0 curve.
    The curve is cut at points where the pitch difference between two adjacent frames exceeds `pitch_diff_thres`.
    Spurious notes shorter than `note_dur_thres` are discarded.

    Args:
        y (array): audio signal
        pitch_diff_thres (float, optional): F0 cutting point threshold. Defaults to 0.8.
        note_dur_thres (int, optional): Estimated notes whose duration is shorter than this threshold is discarded. Defaults to 75ms.

    Returns:
        array: Estimated intervals: [(onset, offset)]
    """
    est_intervals = []

    # if the segment is shorter than note_dur_thres, just discard it
    if len(y) < note_dur_thres/1000*sr:
        return []
        
    f0, _, _ = librosa.pyin(
        y,
        fmin=librosa.note_to_hz("C2"),
        fmax=librosa.note_to_hz("G6"),
        sr=sr,
        center=True,
    )
    times = librosa.times_like(f0, sr=sr)
    notes = librosa.hz_to_midi(f0)

    note_events = np.split(notes, np.where(abs(np.diff(notes)) > pitch_diff_thres)[0] + 1)
    time_intervals = np.split(times, np.where(abs(np.diff(notes)) > pitch_diff_thres)[0] + 1)

    for i in range(len(note_events)):
        note_event = note_events[i]
        time_interval = time_intervals[i]
        # ignore spurious note events
        if len(time_interval) > librosa.time_to_frames(note_dur_thres/1000, sr=sr):
            est_intervals.append([time_interval[0], time_interval[-1]])
    
    est_intervals = np.array(est_intervals)
    return est_intervals

In [5]:
# onset detection-based note-event separation heuristics
def onset_note_tracker(y, sr, note_dur_thres=70, backtrack=False):
    est_intervals = []
    onsets = librosa.onset.onset_detect(y=y, sr=sr, units='time', backtrack=backtrack)

    # the onset detection tend to ignore the first note in the audio
    # so add the time 0 to the onsets
    onsets = np.insert(onsets, 0, 0)

    for i in range(len(onsets)-1):
        onset = onsets[i]
        next_onset = onsets[i+1]
        est_intervals.append([onset, next_onset])

    # the last onset to the end of the audio
    est_intervals.append([onsets[-1], len(y)/sr])

    est_intervals = [interval for interval in est_intervals if interval[1] - interval[0] > note_dur_thres/1000]
    est_intervals = np.array(est_intervals)
    return est_intervals

In [6]:
pt_clf = joblib.load("/Users/jw/Documents/unified_clf.joblib")

In [7]:
RESULT_NOTE_DIR = "/Users/jw/Documents/note_results"
RESULT_TRAN_DIR = "/Users/jw/Documents/tran_results"

In [None]:
i = 0
for audio_file in glob.glob(os.path.join(SINGLE_TRACK_AUDIO_DIR, "*.wav")):
    track_name, _ = os.path.splitext(audio_file.split("/")[-1])
    print(track_name, i)
    i += 1
    gtp_file = os.path.join(SINGLE_TRACK_GTP_DIR, track_name + ".gp5")
    anno_file = os.path.join(SINGLE_TRACK_ANNO_DIR, track_name + ".json")

    all_est_notes = []
    all_est_trans = []

    y, sr = librosa.load(audio_file, sr=None)

    p, m, s = poly_vs_mono_vs_silence(guitarpro.parse(gtp_file))
    for mono_timestamp in m:
        start_sp = librosa.time_to_samples(mono_timestamp[0], sr)
        end_sp = librosa.time_to_samples(mono_timestamp[1], sr)
        mono_segment = y[start_sp : end_sp]
        est_notes = onset_note_tracker(mono_segment, sr, note_dur_thres=50, backtrack=True)
        # convert to global time
        est_notes = est_notes + mono_timestamp[0]
        if len(est_notes) != 0:
            all_est_notes.append(est_notes)

    all_note_features = []
    all_tran_features = []

    features = extract_features(y, sr)

    for est_notes in all_est_notes:
        for j in range(len(est_notes)):
            est_note = est_notes[j]
            onset_fr = librosa.time_to_frames(est_note[0], sr=sr)
            offset_fr = librosa.time_to_frames(est_note[1], sr=sr)
            # if note duration is shorter than one frame, discard it
            if offset_fr - onset_fr < 1:
                continue
            note_feature = features[:, onset_fr : offset_fr+1]
            note_aggregation = get_all_stats(note_feature)
            all_note_features.append(note_aggregation)

            if j != len(est_notes) - 1:
                all_est_trans.append(est_note)
                tran_feature = features[:, offset_fr-2 : offset_fr+3]
                tran_aggregation = get_all_stats(tran_feature)
                all_tran_features.append(tran_aggregation)

    # if there is no estimated notes (when the whole track is poly)
    if len(all_est_notes) == 0:
        continue

    all_est_notes = np.concatenate(all_est_notes, axis=0)
    all_note_features = np.array(all_note_features)
    all_tran_features = np.array(all_tran_features)

    assert all_est_notes.shape[0] == all_note_features.shape[0]
    assert all_tran_features.shape[0] <= all_note_features.shape[0]

    y_pred_notes = pt_clf.predict(all_note_features)
    y_pred_trans = pt_clf.predict(all_tran_features)

    note_result = np.column_stack((all_est_notes, y_pred_notes))
    tran_result = np.column_stack((all_est_trans, y_pred_trans))

    result_note_file = os.path.join(RESULT_NOTE_DIR, track_name + ".csv")
    result_tran_file = os.path.join(RESULT_TRAN_DIR, track_name + ".csv")

    np.savetxt(result_note_file, note_result, fmt=["%.2f", "%.2f", "%d"], delimiter=",")
    np.savetxt(result_tran_file, tran_result, fmt=["%.2f", "%.2f", "%d"], delimiter=",")
    

In [10]:
def evaluate_e2e(result_file):
    note_result = np.loadtxt(os.path.join(RESULT_NOTE_DIR, result_file), delimiter=",")
    tran_result = np.loadtxt(os.path.join(RESULT_TRAN_DIR, result_file), delimiter=",")

    note_timestamps = note_result[:, :2]
    note_labels = note_result[:, 2]

    tran_timestamps = tran_result[:, :2]
    tran_labels = tran_result[:, 2]

    bend_timestamps = note_timestamps[note_labels==1]
    vibrato_timestamps = note_timestamps[note_labels==2]
    
    hammer_timestamps = tran_timestamps[tran_labels==3]
    pull_timestamps = tran_timestamps[tran_labels==4]
    slide_timestamps = tran_timestamps[tran_labels==5]

    track_name, _ = os.path.splitext(result_file.split("/")[-1])
    anno_file = os.path.join(SINGLE_TRACK_ANNO_DIR, track_name+".json")
    with open(anno_file) as anno:
        annotation = json.load(anno)

    ref_bend_timestamps = []
    ref_vibrato_timestamps = []
    ref_hammer_timestamps = []
    ref_pull_timestamps = []
    ref_slide_timestamps = []

    for i in range(len(annotation)):
        note_event = annotation[i]

        if note_event["time"]["dur"] <= 2048 / sr:
            continue

        onset_time = note_event["time"]["start"]
        offset_time = note_event["time"]["start"] + note_event["time"]["dur"]

        if note_event["effects"]["bend"]:
            ref_bend_timestamps.append([onset_time, offset_time])
        if note_event["effects"]["vibrato"]:
            ref_vibrato_timestamps.append([onset_time, offset_time])

        # if the note event is the last one in the song, ignore its transitions 
        if i == len(annotation) - 1:
            break

        next_note_event = annotation[i + 1]
        # if the next note event doesn't immediately follow the current note event, ignore the transition
        if next_note_event["time"]["start"] - offset_time > 0.05:
            continue

        if note_event["effects"]["hammer"]:
            if note_event["pitch"] < next_note_event["pitch"]:
                ref_hammer_timestamps.append([onset_time, offset_time])
            elif note_event["pitch"] > next_note_event["pitch"]:
                ref_pull_timestamps.append([onset_time, offset_time])
        if note_event["effects"]["slide"]:
            ref_slide_timestamps.append([onset_time, offset_time])

    ref_bend_timestamps = np.array(ref_bend_timestamps)
    ref_vibrato_timestamps = np.array(ref_vibrato_timestamps)
    ref_hammer_timestamps = np.array(ref_hammer_timestamps)
    ref_pull_timestamps = np.array(ref_pull_timestamps)
    ref_slide_timestamps = np.array(ref_slide_timestamps)

    # matching and counting scores
    bend_anno_cnt = len(ref_bend_timestamps)
    bend_est_cnt = len(bend_timestamps)
    if bend_anno_cnt == 0 or bend_est_cnt == 0:
        bend_matching_cnt = 0
    else:
        bend_onset_matching = mir_eval.transcription.match_note_onsets(ref_bend_timestamps, bend_timestamps)
        bend_offset_matching = mir_eval.transcription.match_note_offsets(ref_bend_timestamps, bend_timestamps)
        bend_matching = set(bend_onset_matching+bend_offset_matching)
        bend_matching_cnt = len(bend_matching)

    vibrato_anno_cnt = len(ref_vibrato_timestamps)
    vibrato_est_cnt = len(vibrato_timestamps)
    if vibrato_anno_cnt == 0 or vibrato_est_cnt == 0:
        vibrato_matching_cnt = 0
    else:
        vibrato_onset_matching = mir_eval.transcription.match_note_onsets(ref_vibrato_timestamps, vibrato_timestamps)
        vibrato_offset_matching = mir_eval.transcription.match_note_offsets(ref_vibrato_timestamps, vibrato_timestamps)
        vibrato_matching = set(vibrato_onset_matching+vibrato_offset_matching)
        vibrato_matching_cnt = len(vibrato_matching)

    hammer_anno_cnt = len(ref_hammer_timestamps)
    hammer_est_cnt = len(hammer_timestamps)
    if hammer_anno_cnt == 0 or hammer_est_cnt == 0:
        hammer_matching_cnt = 0
    else:
        hammer_matching = mir_eval.transcription.match_note_offsets(ref_hammer_timestamps, hammer_timestamps)
        hammer_matching_cnt = len(hammer_matching)

    pull_anno_cnt = len(ref_pull_timestamps)
    pull_est_cnt = len(pull_timestamps)
    if pull_anno_cnt == 0 or pull_est_cnt == 0:
        pull_matching_cnt = 0
    else:
        pull_matching = mir_eval.transcription.match_note_offsets(ref_pull_timestamps, pull_timestamps)
        pull_matching_cnt = len(pull_matching)

    slide_anno_cnt = len(ref_slide_timestamps)
    slide_est_cnt = len(slide_timestamps)
    if slide_anno_cnt == 0 or slide_est_cnt == 0:
        slide_matching_cnt = 0
    else:
        slide_matching = mir_eval.transcription.match_note_offsets(ref_slide_timestamps, slide_timestamps)
        slide_matching_cnt = len(slide_matching)

    result_matrix = np.array(
        [
            [bend_anno_cnt, bend_est_cnt, bend_matching_cnt], 
            [vibrato_anno_cnt, vibrato_est_cnt, vibrato_matching_cnt],
            [hammer_anno_cnt, hammer_est_cnt, hammer_matching_cnt],
            [pull_anno_cnt, pull_est_cnt, pull_matching_cnt],
            [slide_anno_cnt, slide_est_cnt, slide_matching_cnt]
        ]
    )

    return result_matrix

In [None]:
# this result is using backtrack=True in onset detection
total_result_matrix = np.zeros((5, 3))
for result_file in glob.glob(os.path.join(RESULT_NOTE_DIR, "*.csv")):
    result_file = result_file.split("/")[-1]
    try:
        result_matrix = evaluate_e2e(result_file)
    except IndexError:
        continue
    total_result_matrix += result_matrix
print(total_result_matrix)

In [None]:
match = total_result_matrix[:, 2]
anno = total_result_matrix[:, 0]
est = total_result_matrix[:, 1]
precision = match / est
recall = match / anno
f1 = 2 * precision * recall / (precision + recall)
print(precision)
print(recall)
print(f1)