In [None]:
import os
import shutil
import numpy as np
import librosa
import soundfile as sf
import pandas as pd
from IPython.display import Audio, display
import matplotlib.pyplot as plt

In [None]:
def estimate_ibi(annotation: np.ndarray):
    intervals = np.diff(annotation)
    intervals = intervals[intervals > 0.3]
    intervals = intervals[intervals < 1]
    ibi = np.median(intervals)
    return ibi

def fill_missing_beats(annotation: np.ndarray, end: float = 0):
    intervals = np.diff(annotation)
    ibi = estimate_ibi(annotation)
    result = [annotation[0]]
    for i in range(len(intervals)):
        current = annotation[i]
        interval = intervals[i]
        missing = round(interval / ibi) - 1
        if missing > 0:
            step = interval / (missing + 1)
            for j in range(1, missing + 1):
                result.append(current + step * j)
        result.append(annotation[i + 1])
    end += ibi/2
    while end is not None and result[-1]+ibi < end:
        result.append(result[-1]+ibi)
    result.reverse()
    while result[-1] - ibi > 0:
        result.append(result[-1] - ibi)
    result.reverse()
    return np.array(result)


def load_annotations(annotation_path: str):
    annotations = []
    for file in sorted(os.listdir(annotation_path)):
        if not file.endswith('.txt'):
            continue
        file_path = os.path.join(annotation_path, file)
        df = pd.read_csv(file_path)
        annotation = np.array(df['TIME'].values)
        annotations.append(annotation)
    return annotations

def combine_annotations(annotations: list, distance_threshold: float = 0.02):
    n = len(annotations)
    stacked = np.sort(np.hstack(annotations))
    result = []
    for i, value in enumerate(stacked[:-n+1]):
        candidate = stacked[i+n-1]
        mean = np.mean(stacked[i:i+n-1])
        if result and result[-1] + 0.3 > mean:
            continue
        if candidate - value < distance_threshold:
            result.append(mean)
    result = np.array(result)
    return result, max(stacked)

def apply_smoothing(annotation: np.ndarray, smoothing_size: float = 3):
    ibi = estimate_ibi(annotation)
    result = []
    
    for center in annotation:
        mask = np.abs(annotation - center) < smoothing_size * ibi
        candidates = annotation[mask].copy()
        multiple = np.round((center - candidates) / ibi)
        candidates += multiple * ibi
        result.append(np.mean(candidates))
    return np.array(result)

def play(audio_path: str, click: np.ndarray = None):
    y, sr = librosa.load(audio_path, sr=None)
    clicks = librosa.clicks(times=click, sr=sr, length=len(y), click_freq=1000)
    display(Audio(y + clicks, rate=sr))

def filter_silence(raw: list, annotation: np.ndarray):
    beat = estimate_ibi(annotation)
    window = 2 * beat
    raw_stacked = np.hstack(raw)
    
    result = []
    for value in annotation:
        distances = np.abs(raw_stacked - value)
        if np.any(distances < window):
            result.append(value)
    return np.array(result)

def pipeline(annotation_path: str, smoothing_size: float = 2.2, distance_threshold: float = 0.05):
    raw_annotations = load_annotations(annotation_path)
    annotations = []
    for annotation in raw_annotations:
        annotation = fill_missing_beats(annotation)
        annotation = apply_smoothing(annotation, smoothing_size)
        annotations.append(annotation)

    result, end = combine_annotations(annotations, distance_threshold)
    result = apply_smoothing(result, smoothing_size)
    length = len(result)
    stacked = np.sort(np.hstack(annotations))
    plot(stacked, result)
    result = fill_missing_beats(result, end)
    result = filter_silence(raw_annotations, result)

    interpolated = length / len(result)
    print(f"interpolated: {(1 - interpolated) * 100:.2f}%")
    print(f"bpm: {60 / np.mean(np.diff(result)):.2f}")
    return result

def plot(raw: np.ndarray, final: np.ndarray):
    plt.figure(figsize=(10, 2))
    plt.vlines(x=raw, ymin=0, ymax=1, linewidth=0.4)
    plt.vlines(x=final, ymin=0, ymax=1, linewidth=0.4, colors='red')
    plt.show()



ANNOTATION_PATH = "../dataset"
name = """\
looperman-a-5046568-0021260-pearl-ft-lil-lan-and-yung-kabin
"""[:-1]
ACAPELLA_PATH = f"../dataset/looperman_sub/{name}.mp3"

annotation = pipeline(ANNOTATION_PATH, smoothing_size=2.2, distance_threshold=0.05)
play(ACAPELLA_PATH, annotation)



In [None]:
def write_dataset(audio_path: str, dataset_path: str, annotation: np.ndarray, start_beat: int):
    y, sr = librosa.load(audio_path, sr=None)
    end = annotation[-1]
    y = y[:int((end + 2)*sr)]
    
    existing_files = [f for f in os.listdir(dataset_path) if f.startswith('audio')]
    next_num = len(existing_files) + 1
    
    labels = []
    beat_counter = start_beat
    for i in range(len(annotation)):
        bar = (beat_counter - 1) // 4 + 1
        beat_in_bar = ((beat_counter - 1) % 4) + 1
        labels.append(f"{bar}.{beat_in_bar}")
        beat_counter += 1
    
    df = pd.DataFrame({'TIME': annotation, 'LABEL': labels})
    annotation_filename = os.path.join(dataset_path, f'annotation{next_num}.txt')
    df.to_csv(annotation_filename, index=False)

    audio_filename = os.path.join(dataset_path, f'audio{next_num}.wav')
    sf.write(audio_filename, y, sr)

DATASET_PATH = "../dataset/dataset"
write_dataset(ACAPELLA_PATH, DATASET_PATH, annotation, start_beat=1)

# remove the rest
DESTINATION = "../dataset/annotations"
l = len([x for x in os.listdir(DESTINATION) if not x.startswith('.')]) + 1
os.makedirs(os.path.join(DESTINATION, str(l)))
for i, file in enumerate(os.listdir(ANNOTATION_PATH)):
    if not file.endswith('.txt'):
        continue
    source = os.path.join(ANNOTATION_PATH, file)
    sink = os.path.join(DESTINATION, f"{l}", file)
    shutil.move(source, sink)
    os.rename(sink, os.path.join(DESTINATION, f"{l}", f"{name}{i}.txt"))
os.remove(ACAPELLA_PATH)
    