In [1]:
import os
import pickle
import numpy as np
import seaborn as sns
from tqdm import tqdm
import pandas as pd
# from dance_evaluation import *
import matplotlib.pyplot as plt
# from calculate_score import *
from collections import defaultdict

def load_pickle(filepath):
    with open(filepath, "rb") as f:
        json_data = pickle.load(f)
    return json_data

def save_to_pickle(filepath, data):
    # filepath = os.path.join(savepath, filename)
    with open(filepath, "wb") as f:
        pickle.dump(data, f)

## Dance Tempo Score

In [2]:
def compute_dts(
    ref_bpm,
    estimated_bpm,
    tau=0.1,
    mode="one"
):
    """
    Continuous Dance-Tempo Score (DTS), with support for
    either single estimates (mode="one") or multiple
    candidates per frame (mode="many").

    Parameters
    ----------
    ref_bpm : array-like, shape (n,)
        Ground-truth musical tempo in BPM.
    estimated_bpm : 
        If mode="one": array-like, shape (n,)
        If mode="many": iterable of length-n, each element
                        is an iterable of candidate BPMs.
    tau : float, optional
        Tolerance in octaves (0.06 ≈ 4 %).
    mode : {"one", "many"} 
        “one”: treat `estimated_bpm` as a flat sequence.
        “many”: pick, for each i, the candidate closest to ref_bpm[i]. For best of two

    Returns
    -------
    dts : ndarray, shape (n,)
        Scores in [0, 1] (1 = perfect, 0 = miss ≥ τ octaves away).
    e : ndarray, shape (n,)
        Raw octave errors log₂(estimate/ref).
    d : ndarray, shape (n,)
        Wrapped distance to {-1, 0, +1} before clipping.
    """
    ref_bpm = np.asarray(ref_bpm, dtype=float)

    # select a single estimate per index if needed
    if mode == "many":
        chosen = np.array([
            min(cands, key=lambda b: abs(b - ref_bpm[i]))
            for i, cands in enumerate(estimated_bpm)
        ], dtype=float)
    elif mode == "one":
        chosen = np.asarray(estimated_bpm, dtype=float)
    else:
        raise ValueError(f"Unknown mode: {mode!r}. Use 'one' or 'many'.")

    # DTS core ------------------------------------------------------
    e = np.log2(chosen / ref_bpm)
    # distance from nearest of -1, 0, +1
    d = np.abs(e[:, None] - np.array([-1.0, 0.0, 1.0])).min(axis=1)
    # clip by tolerance and convert to score
    d_clip = np.minimum(d, tau)
    dts    = 1.0 - d_clip / tau

    accuracy = (dts > 0.0).mean() * 100
    
    # hits ----------------------------------------------------------
    hit_mask = dts > 0.0          # inside ±tau band
    hit_idx = np.nonzero(hit_mask)[0]
    ref_hit_bpm = ref_bpm[hit_idx]
    
    return dts, e, d, accuracy, hit_idx, ref_hit_bpm


In [15]:
friendly = {
    "adaptv_Bhandfoot_y":  "Both-Hand + Foot (Y)",
    "adaptv_LRfoot_xy":    "L-/R-Foot (XY)",
    "adaptv_LRfoot_res":   "L-/R-Foot (Resultant)",
    "adaptv_LRhand_xy":    "L-/R-Hand (XY)",
    "adaptv_LRhand_res":   "L-/R-Hand (Resultant)",
    "adaptv_Bfoot_x_y":    "Both-Foot (X & Y)",
    "adaptv_Bhandfoot_x":  "Both-Hand & Both-Foot (X)",
    "adaptv_Bhand_x_y":    "Both-Hand (X & Y)",
    "adaptv_Bhandfoot_res":"Both-Hand & Both-Foot (Resultant)",
}

def estimate_tempo_posvel(a, b, mode, w_sec, h_sec):
    # Using both zero velocity and peak velocity
    segment_pairs = [
        "adaptv_Bhandfoot_y",
        "adaptv_Bhandfoot_x",
        "adaptv_LRhand_xy",
        "adaptv_LRfoot_xy",
        "adaptv_Bhand_x_y",
        "adaptv_Bfoot_x_y",
        "adaptv_LRfoot_res",
        "adaptv_LRhand_res",
        "adaptv_Bhandfoot_res",
        ]
    

    
    score_data = {}
    json_data = {}
    # oPath = f"./saved_result/tttempo_{a}_{b}/"
    root = "/itf-fi-ml/home/sagardu/aist_tempo_est/saved_result_adaptive"
    pth_pos = f"{root}/tempo_{a}_{b}/pos"
    pth_vel = f"{root}/tempo_{a}_{b}/vel"
    
    pth_rms = f"/itf-fi-ml/home/sagardu/aist_tempo_est/saved_result_rms_adaptive/tempo_{a}_{b}/pos"
    
    bpm_dict = ["bpm_avg",  "bpm_mode", "bpm_median"]
    for bpm_mode in bpm_dict:
        score_data[bpm_mode] = {}
        json_data[bpm_mode] = {}
        for seg in segment_pairs:
            fpath_pos = os.path.join(pth_pos, f"{seg}_{mode}_W{w_sec}_H{h_sec}_{a}_{b}.pkl")
            fpath_vel = os.path.join(pth_vel, f"{seg}_{mode}_W{w_sec}_H{h_sec}_{a}_{b}.pkl")
            
            fpath_rms = os.path.join(pth_rms, f"{seg}_{mode}_W{w_sec}_H{h_sec}_{a}_{b}.pkl") 
            fpath_torso = os.path.join("/itf-fi-ml/home/sagardu/aist_tempo_est/extracted_body_onsets_sept25/torso", f"torso_y_zero_uni_W5_H2.5_60_140.pkl")
            
            df1 = load_pickle(fpath_pos)
            df2 = load_pickle(fpath_vel)
            df3 = load_pickle(fpath_rms)   # RMS position
            df4 = load_pickle(fpath_torso)   # Torso Y position

            # Build candidate BPM pairs 
            bpm_pos = []
            bpm_vel = []
            bpm_posvel = []
            for n in range(df1.shape[0]):
                bpm1 = df1.iloc[n][bpm_mode]   # 
                bpm2 = df2.iloc[n][bpm_mode]   #  
                bpm3 = df3.iloc[n][bpm_mode]   #  RMS position
                bpm4 = df4.iloc[n][bpm_mode]   #  torso Y 

                # bpm_posvel.append((bpm1, bpm2))
                bpm_posvel.append((bpm1, bpm2, bpm3, bpm4))
            
            # music_tempo from df1 
            ref = df1["music_tempo"].to_numpy()
            
            _, _, _, dts_acc3, hit_idx, ref_hit_bpm = compute_dts(ref, bpm_posvel, tau=0.10, mode = "many")
            
            score_data[bpm_mode][friendly[seg]] = {"acc": dts_acc3, "hit_idx": hit_idx, "ref_hit_bpm": ref_hit_bpm}
            
            json_data[bpm_mode][seg] = {"bpm_pos": bpm_pos,
                                             "bpm_vel": bpm_vel,
                                            "bpm_posvel": bpm_posvel,
                                            "Acc1_bpm_pos": 5,
                                            "Acc1_bpm_vel": 5,
                                            "Acc1_bpm_posvel": dts_acc3,}
            
    
    #### Sace the score data to a pickle file
    # save_dir = f"./saved_result_rms_adaptive/tempo_{a}_{b}/score"
    # fname1 = f"score_multi_adap_posvelenergy_{mode}_W{w_sec}_H{h_sec}_{a}_{b}.pkl"
    # fpath1 = os.path.join(save_dir, fname1)
    # save_to_pickle(fpath1, score_data)
        
    return json_data

accuracy_dict_posvel = estimate_tempo_posvel(60, 140, "zero_uni", 5, 5/2)   # Best of two

In [16]:
segment_pairs = [
        "adaptv_Bhandfoot_y",
        "adaptv_Bhandfoot_x",
        "adaptv_LRhand_xy",
        "adaptv_LRfoot_xy",
        "adaptv_Bhand_x_y",
        "adaptv_Bfoot_x_y",
        "adaptv_LRfoot_res",
        "adaptv_LRhand_res",
        "adaptv_Bhandfoot_res",
        ]
friendly = {
    "adaptv_Bhandfoot_y":  "Both-Hand + Foot (Y)",
    "adaptv_LRfoot_xy":    "L-/R-Foot (XY)",
    "adaptv_LRfoot_res":   "L-/R-Foot (Resultant)",
    "adaptv_LRhand_xy":    "L-/R-Hand (XY)",
    "adaptv_LRhand_res":   "L-/R-Hand (Resultant)",
    "adaptv_Bfoot_x_y":    "Both-Foot (X & Y)",
    "adaptv_Bhandfoot_x":  "Both-Hand & Both-Foot (X)",
    "adaptv_Bhand_x_y":    "Both-Hand (X & Y)",
    "adaptv_Bhandfoot_res":"Both-Hand & Both-Foot (Resultant)",
}

best_of_three={}
for seg in segment_pairs:
    best_of_three[friendly[seg]] = np.round(accuracy_dict_posvel["bpm_median"][seg]["Acc1_bpm_posvel"],2)
    
best_of_three   

{'Both-Hand + Foot (Y)': 75.17,
 'Both-Hand & Both-Foot (X)': 54.29,
 'L-/R-Hand (XY)': 70.84,
 'L-/R-Foot (XY)': 64.35,
 'Both-Hand (X & Y)': 72.48,
 'Both-Foot (X & Y)': 67.41,
 'L-/R-Foot (Resultant)': 19.39,
 'L-/R-Hand (Resultant)': 19.39,
 'Both-Hand & Both-Foot (Resultant)': 44.3}

### For adaptive weighting

In [10]:
from collections import defaultdict

# path setup
a, b = 60, 140
root = "/itf-fi-ml/home/sagardu/aist_tempo_est/saved_result_rms_adaptive"
pth_pos = f"{root}/tempo_{a}_{b}/pos"
# pth_vel = f"{root}/tempo_{a}_{b}/vel"

# descriptive labels for plot axis
friendly = {
    "adaptv_Bhandfoot_y":  "Both-Hand + Foot (Y)",
    "adaptv_LRfoot_xy":    "L-/R-Foot (XY)",
    "adaptv_LRfoot_res":   "L-/R-Foot (Resultant)",
    "adaptv_LRhand_xy":    "L-/R-Hand (XY)",
    "adaptv_LRhand_res":   "L-/R-Hand (Resultant)",
    "adaptv_Bfoot_x_y":    "Both-Foot (X & Y)",
    "adaptv_Bhandfoot_x":  "Both-Hand & Both-Foot (X)",
    "adaptv_Bhand_x_y":    "Both-Hand (X & Y)",
    "adaptv_Bhandfoot_res":"Both-Hand & Both-Foot (Resultant)",
}

# helper to compute accuracy and fill a dictionary
def collect_accuracies(folder, metric):
    acc = {}
    score_data = {"bpm_median": {}, "bpm_avg": {}, "bpm_mode": {}}
    for fname in os.listdir(folder):
        
        if "zero_bi" in fname:
            continue
        
        tag = fname.split("_zero_uni")[0]
        data = load_pickle(f"{folder}/{fname}")
        ref  = data["music_tempo"].to_numpy()
        _, _, _, accuracy, hit_idx, ref_hit_bpm = compute_dts(ref, data["bpm_median"].to_numpy(),
                                            tau=0.10, mode="one")
        
        acc[friendly[tag]] = round(accuracy, 2)
        
        score_data["bpm_median"][friendly[tag]] = {"acc": accuracy, "hit_idx": hit_idx, "ref_hit_bpm": ref_hit_bpm}
        
    # save the score data
    # save_dir = f"./saved_result_adaptive/tempo_{a}_{b}/score"
    # fname1 = f"score_multi_adap_{metric}_zero_uni_W5_H2.5_25_140.pkl"
    # fpath1 = os.path.join(save_dir, fname1)
    # save_to_pickle(fpath1, score_data)
    
    return acc

# build two dicts, one for pos and one for vel, ready for bar plotting
accuracy_dict_energy = collect_accuracies(pth_pos, "pos")
# accuracy_dict_vel = collect_accuracies(pth_vel, "vel")

accuracy_dict_energy


{'L-/R-Hand (XY)': 47.13,
 'L-/R-Hand (Resultant)': 20.36,
 'L-/R-Foot (Resultant)': 20.36,
 'Both-Hand + Foot (Y)': 58.99,
 'Both-Foot (X & Y)': 44.15,
 'Both-Hand (X & Y)': 51.53,
 'Both-Hand & Both-Foot (X)': 27.59,
 'L-/R-Foot (XY)': 39.67,
 'Both-Hand & Both-Foot (Resultant)': 20.36}