In [None]:
import copy
import logging
import math
import numpy as np
import pandas as pd
import scipy.signal as sig
from scipy.stats import linregress
from scipy.interpolate import InterpolatedUnivariateSpline
import scipy.optimize as opt
import altair as alt
import flammkuchen as fl
from ray import tune
from ray.tune import ProgressReporter, JupyterNotebookReporter
from ray.tune.schedulers import AsyncHyperBandScheduler, PopulationBasedTraining

import sys, os, os.path

sys.path.append(os.path.expanduser("../src"))

from src.spinorama.filter_iir import Biquad
from src.spinorama.filter_peq import peq_build, peq_print
from src.spinorama.load import graph_melt
from src.spinorama.load_rewseq import parse_eq_iir_rews
from src.spinorama.graph import (
    graph_spinorama,
    graph_freq,
    graph_regression_graph,
    graph_regression,
)
from src.spinorama.compute_scores import scores, nbd
from src.spinorama.filter_scores import scores_apply_filter, scores_print, scores_loss

logger = logging.getLogger("spinorama")
fh = logging.FileHandler("debug_optim.log")
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.setLevel("INFO")

In [None]:
df_all_speakers = fl.load("../cache.parse_all_speakers.h5")

In [None]:
# L2 norm
def lw_loss1(local_target, peq):
    return np.linalg.norm(local_target + peq_build(freq, peq), 2)


# sum of L2 norms if we have multiple targets
def lw_loss2(local_target, peq, iterations):
    return np.sum([lw_loss1(local_target[i], peq) for i in range(0, len(local_target))])


# pref score
def pref_loss(local_target, peq, iterations):
    _, _, score = scores_apply_filter(df_speaker, peq)
    return -score["pref_score"]


# make LW as close as target as possible and SP flat
def flat_loss(local_target, peq, iterations):
    # want lw as close as possible from targeto
    lw = lw_loss1(local_target[0], peq)
    # lw = nbd(local_target[0]+peq_build(freq, peq))
    # want sound power to be flat but not necessary aligned with target
    slope, intercept, r_value, p_value, std_err = linregress(np.log10(freq), local_target[1])
    sp = 1 - r_value**2
    return lw * sp


def swap_loss(local_target, peq, iteration):
    if len(local_target) == 0 or iteration < 10:
        return lw_loss2([local_target[0]], peq)
    else:
        return lw_loss2([local_target[1]], peq)


def alternate_loss(local_target, peq, iteration):
    if len(local_target) == 0 or iteration % 2 == 0:
        return lw_loss2([local_target[0]], peq)
    else:
        return lw_loss2([local_target[1]], peq)


# compute pref scrore
def score_loss(df_spin, peq):
    """Compute the preference score for speaker

    local_target: unsued
    peq: evaluated peq

    return minus score (we want to maximise the score)
    """
    _, _, score = scores_apply_filter(df_spin, peq)
    return -score["pref_score"]


# def lw_loss_r2(local_target, peq):
#   _, _, r_value, _, _ = linregress(np.log(freq), local_target+peq_build(freq, peq))
#   return -r_value**2

In [None]:
# name of speaker
# speaker_name = 'KEF LS50'
speaker_name = "KRK Systems Classic 5"
# speaker_name = 'Genelec 8341A'
# speaker_name = 'Verdant Audio Bambusa MG 1'
# speaker_name = 'Genelec 8030C'
# all graphs for this speaker
df_speaker = df_all_speakers[speaker_name]["ASR"]["asr"]
# print(df_speaker.keys())
# original_mean = df_speaker['CEA2034_original_mean']
# read EQ for this speaker
my_fs = 48000
my_freq_reg_min = 300
my_freq_mean_min = 100
my_freq_mean_max = 300
manual_peq = parse_eq_iir_rews("../datas/eq/{}/iir.txt".format(speaker_name), my_fs)
# peq_print(manual_peq)
# curve to optimise for
# curve_name = 'On Axis'
curve_name = ["Listening Window", "Sound Power"]
# curve_name = ['Listening Window', 'Early Reflections']
# curve_name = ['Early Reflections', 'Listening Window']
# curve_name = ['Sound Power', 'Listening Window']
# curve_name = ['On Axis', 'Sound Power']
# curve_name = ['Listening Window']
# curve_name = ['Estimated In-Room Response']
# curve_name = ['Early Reflections']
# curve_name = ['Sound Power']
my_number_curves = len(curve_name)


def getFreq(df_speaker_data):
    # extract LW
    columns = {"Freq"}.union(curve_name)
    df = df_speaker_data["CEA2034_unmelted"].loc[:, columns]
    # selector
    selector = df["Freq"] > my_freq_reg_min
    # freq
    freq = df.loc[selector, "Freq"].values
    local_target = []
    for i in range(0, my_number_curves):
        local_target.append(df.loc[selector, curve_name[i]].values)
    return df, freq, local_target


def getCurve(df, freq, current_curve_name):
    selector = (df["Freq"] > my_freq_mean_min) & (df["Freq"] <= my_freq_mean_max)
    # freq
    current_curve = df.loc[df["Freq"] > my_freq_reg_min, current_curve_name].values
    # print(len(freq), len(current_curve))
    # compute linear reg on lw
    slope, intercept, r_value, p_value, std_err = linregress(np.log10(freq), current_curve)
    # normalise to have a flat target (primarly for bright speakers)
    if current_curve_name == "On Axis":
        slope = 0
        intercept = current_curve[0]
    elif current_curve_name == "Listening Window":
        # slighlithy downward
        slope = -2 / math.log(20000)
        intercept = current_curve[0]
    elif current_curve_name == "Early Reflections":
        slope = -5 / math.log(20000)
        intercept = current_curve[0]
    elif current_curve_name == "Sound Power":
        slope = -8 / math.log(20000)
        intercept = current_curve[0]
    target_interp = [(slope * math.log10(freq[i]) + intercept) for i in range(0, len(freq))]
    print(
        "Slope {} Intercept {} R {} P {} err {}".format(slope, intercept, r_value, p_value, std_err)
    )
    return target_interp


# compute linear reg on lw filtered
df, freq, target = getFreq(df_all_speakers[speaker_name]["ASR"]["asr"])
target_interp = []
for i in range(0, my_number_curves):
    target_interp.append(getCurve(df, freq, curve_name[i]))

# lw_interp_last = lw_interp[-1]-np.mean(lw_interp)
# if lw_interp[-1]-np.mean(lw_interp)>0.3:
#    print('Warning: bright target {}dB at {} Hz'.format(lw_interp_last, freq[-1]))

df_eq, freq_eq, lw_eq = getFreq(df_all_speakers[speaker_name]["ASR"]["asr_eq"])
lw_eq_interp = []
lw_target = []
for i in range(0, my_number_curves):
    lw_eq_interp.append(getCurve(df_eq, freq_eq, curve_name[i]))
    lw_target.append(lw_eq[i] - lw_eq_interp[i])

print("manual eq loss: {}".format(flat_loss(lw_target, [], 0)))

target_init = target[0][0]
df_init_curve = pd.DataFrame(
    {
        "Freq": freq,
        "curve0": target[0] - target_init,
        "curve0 interp": target_interp[0] - target_init,
        "manual0": lw_eq[0] - lw_eq[0][0] - 5,
        "manual0 interp": lw_eq_interp[0] - lw_eq[0][0] - 5,
    }
)
for i in range(1, len(curve_name)):
    df_init_curve["curve{}".format(i)] = target[i] - target[i][0] - (5 + i * 10)
    df_init_curve["curve{} interp".format(i)] = (
        target_interp[i] - target_interp[i][0] - (5 + i * 10)
    )
    df_init_curve["manual{}".format(i)] = lw_eq[i] - lw_eq[i][0] - ((i + 1) * 10)
    df_init_curve["manual{} interp".format(i)] = (
        lw_eq_interp[i] - lw_eq_interp[i][0] - ((i + 1) * 10)
    )

alt.Chart(graph_melt(df_init_curve)).mark_line(size=3).encode(
    alt.X(
        "Freq:Q",
        title="Freq (Hz)",
        scale=alt.Scale(type="log", nice=False, domain=[my_freq_reg_min, 20000]),
    ),
    alt.Y(
        "dB:Q",
        title="Sound Pressure (dB)",
        scale=alt.Scale(zero=False, domain=[-40, 5]),
    ),
    alt.Color("Measurements", type="nominal", sort=None),
).properties(width=800, height=400)

In [None]:
logger.setLevel("INFO")
smoke_test = False

MAX_NUMBER_PEQ = 20
MAX_STEPS_FREQ = 5
MAX_STEPS_DBGAIN = 5
MAX_STEPS_Q = 5
MIN_DBGAIN = 0.5
MAX_DBGAIN = 12
MIN_Q = 0.5
MAX_Q = 12

if smoke_test:
    MAX_NUMBER_PEQ = 3
    MAX_STEPS_FREQ = 3
    MAX_STEPS_DBGAIN = 3
    MAX_STEPS_Q = 3

# main peq that we want to find
auto_peq = []
# current target is delta between curve and a line
auto_target = []
for i in range(0, my_number_curves):
    auto_target.append(target[i] - target_interp[i])

fixed_freq = set()
best_loss = flat_loss(auto_target, auto_peq, 0)


logger.info(
    "OPTIM {} START #PEQ {} Freq #{} Gain #{} +/-[{}, {}] Q #{} [{}, {}] Initial loss {}".format(
        curve_name,
        MAX_NUMBER_PEQ,
        MAX_STEPS_FREQ,
        MAX_STEPS_DBGAIN,
        MIN_DBGAIN,
        MAX_DBGAIN,
        MAX_STEPS_Q,
        MIN_Q,
        MAX_Q,
        best_loss,
    )
)


def find_largest_area(freq, curve):
    def largest_area(positive_curve):
        found_peaks, _ = sig.find_peaks(positive_curve, distance=10)
        if len(found_peaks) == 0:
            return None, None
        print("found peaks at {}".format(found_peaks))
        found_widths = sig.peak_widths(positive_curve, found_peaks, rel_height=0.1)[0]
        print("computed width at {}".format(found_widths))
        areas = [
            (i, positive_curve[found_peaks[i]] * found_widths[i])
            for i in range(0, len(found_peaks))
        ]
        print("aresas {}".format(areas))
        sorted_areas = sorted(areas, key=lambda a: -a[1])
        print("sorted {}".format(sorted_areas))
        ipeaks, area = sorted_areas[0]
        return found_peaks[ipeaks], area

    plus_curve = np.clip(curve, a_min=0, a_max=None)
    plus_index, plus_areas = largest_area(plus_curve)
    minus_curve = -np.clip(curve, a_min=None, a_max=0)
    minus_index, minus_areas = largest_area(minus_curve)

    if minus_areas is None:
        return +1, plus_index, freq[plus_index]

    if plus_areas is None:
        return -1, minus_index, freq[minus_index]

    if minus_areas > plus_areas:
        return -1, minus_index, freq[minus_index]
    else:
        return +1, plus_index, freq[plus_index]


def propose_range_freq(freq, local_target):
    sign, indice, init_freq = find_largest_area(freq, local_target)
    elastic = 0.8
    init_freq_min = max(init_freq * elastic, 20)
    init_freq_max = min(init_freq / elastic, 20000)
    init_freq_range = np.linspace(init_freq_min, init_freq_max, MAX_STEPS_FREQ).tolist()
    if MAX_STEPS_FREQ == 1:
        init_freq_range = [init_freq]
    init_freq_range = np.linspace(init_freq_min, init_freq_max, MAX_STEPS_FREQ).tolist()
    logger.debug("freq min {}Hz peak {}Hz max {}Hz".format(init_freq_min, init_freq, init_freq_max))
    return sign, init_freq, init_freq_range


def propose_range_dbGain(freq, local_target, sign, init_freq):
    spline = InterpolatedUnivariateSpline(np.log10(freq), local_target, k=1)
    init_dbGain = abs(spline(np.log10(init_freq)))
    init_dbGain_min = max(init_dbGain / 5, MIN_DBGAIN)
    init_dbGain_max = min(init_dbGain * 5, MAX_DBGAIN)
    init_dbGain_range = ()
    if sign < 0:
        init_dbGain_range = np.linspace(init_dbGain_min, init_dbGain_max, MAX_STEPS_DBGAIN).tolist()
    else:
        init_dbGain_range = np.linspace(
            -init_dbGain_max, -init_dbGain_min, MAX_STEPS_DBGAIN
        ).tolist()
    logger.debug(
        "gain min {}dB peak {}dB max {}dB".format(init_dbGain_min, init_dbGain, init_dbGain_max)
    )
    return init_dbGain_range


def propose_range_Q():
    return np.concatenate(
        (
            np.linspace(MIN_Q, 1, MAX_STEPS_Q),
            np.linspace(1 + MIN_Q, MAX_Q, MAX_STEPS_Q),
        ),
        axis=0,
    ).tolist()


def propose_range_biquad():
    return [
        Biquad.PEAK
    ]  # , Biquad.NOTCH] #, Biquad.LOWPASS, Biquad.HIGHPASS, Biquad.BANDPASS, Biquad.LOWSHELF, Biquad.HIGHSHELF]


def find_best_biquad_raytune(auto_target, freq_range, Q_range, dbGain_range, biquad_range):
    def lw_optimizer(config, checkpoint_dir=None):
        peq = [
            (
                1.0,
                Biquad(
                    config["1_type"],
                    config["1_freq"],
                    48000,
                    config["1_Q"],
                    config["1_dbGain"],
                ),
            )
        ]
        intermediate_score = flat_loss(auto_target, peq, 0)
        logger.debug("lw_optim: {} {}".format(intermediate_score, peq_print(peq)))
        tune.report(mean_loss=intermediate_score)

    lw_analysis = tune.run(
        lw_optimizer,
        config={
            "1_freq": tune.grid_search(freq_range),
            "1_Q": tune.grid_search(Q_range),
            "1_dbGain": tune.grid_search(dbGain_range),
            "1_type": tune.grid_search(biquad_range),
        },
        progress_reporter=JupyterNotebookReporter(
            overwrite=True,
            max_progress_rows=5,
            max_report_frequency=30,
            print_intermediate_tables=False,
        ),
        resources_per_trial={"cpu": 1},
        metric="mean_loss",
        mode="max",
    )

    best = lw_analysis.get_best_trial(metric="mean_loss", mode="min").last_result
    logger.debug("best {}".format(best))
    best_type = best["config"]["1_type"]
    best_dbGain = best["config"]["1_dbGain"]
    best_freq = best["config"]["1_freq"]
    best_Q = best["config"]["1_Q"]
    best_loss = best["mean_loss"]

    return True, best_type, best_freq, best_Q, best_dbGain, best_loss


def find_best_biquad_shgo(auto_target, freq_range, Q_range, dbGain_range, biquad_range, count):
    bT = 3

    def opt_peq(x):
        peq = [(1.0, Biquad(bT, x[0], 48000, x[1], x[2]))]
        return swap_loss(auto_target, peq, count)

    bounds = [
        (freq_range[0], freq_range[-1]),
        (Q_range[0], Q_range[-1]),
        (dbGain_range[0], dbGain_range[-1]),
    ]

    logger.debug(
        "Optim (shgo): range is [{}, {}], [{}, {}], [{}, {}]".format(
            bounds[0][0],
            bounds[0][1],
            bounds[1][0],
            bounds[1][1],
            bounds[2][0],
            bounds[2][1],
        )
    )
    res = opt.shgo(opt_peq, bounds)
    logger.debug(
        "          shgo optim loss {:2.2f} in {} iter at F {:.0f} Hz Q {:2.2f}".format(
            res.fun, res.nfev, res.x[0], res.x[1], res.x[2]
        )
    )
    return True, bT, res.x[0], res.x[1], res.x[2], res.fun


def find_best_biquad_differential_evolution(
    auto_target, freq_range, Q_range, dbGain_range, biquad_range, count
):
    bT = 3

    def opt_peq(x):
        peq = [(1.0, Biquad(bT, x[0], 48000, x[1], x[2]))]
        return flat_loss(auto_target, peq, count)

    bounds = [
        (freq_range[0], freq_range[-1]),
        (Q_range[0], Q_range[-1]),
        (dbGain_range[0], dbGain_range[-1]),
    ]

    logger.debug(
        "Optim (shgo): range is [{}, {}], [{}, {}], [{}, {}]".format(
            bounds[0][0],
            bounds[0][1],
            bounds[1][0],
            bounds[1][1],
            bounds[2][0],
            bounds[2][1],
        )
    )
    # can use differential_evolution basinhoppin dual_annealing
    res = opt.dual_annealing(
        opt_peq,
        bounds,
        # disp=True,
        # tol=0.01, # increasing precision doesn't help
    )
    logger.debug(
        "          shgo optim loss {:2.2f} in {} iter at F {:.0f} Hz Q {:2.2f}".format(
            res.fun, res.nfev, res.x[0], res.x[1], res.x[2]
        )
    )
    return True, bT, res.x[0], res.x[1], res.x[2], res.fun


def find_best_biquad_differential_evolution2(
    auto_target, freq_range, Q_range, dbGain_range, biquad_range
):
    def opt_peq(x):
        biquad_freq = x[0]
        biquad_Q = x[1]
        biquad_dbGain = x[2]
        biquad_type = int(x[3])
        peq = [(1.0, Biquad(biquad_type, biquad_freq, 48000, biquad_Q, biquad_dbGain))]
        return flat_loss(auto_target, peq, 0)

    bounds = [
        (freq_range[0], freq_range[-1]),
        (Q_range[0], Q_range[-1]),
        (dbGain_range[0], dbGain_range[-1]),
        (0, 7),
    ]

    logger.debug(
        "Optim (shgo): range is [{}, {}], [{}, {}], [{}, {}]".format(
            bounds[0][0],
            bounds[0][1],
            bounds[1][0],
            bounds[1][1],
            bounds[2][0],
            bounds[2][1],
        )
    )
    res = opt.differential_evolution(
        opt_peq,
        bounds,
        # disp=True,
        # tol=0.01, # increasing precision doesn't help
        # popsize=10
    )
    logger.debug(
        "          shgo optim loss {:2.2f} in {} iter at F {:.0f} Hz Q {:2.2f}".format(
            res.fun, res.nfev, res.x[0], res.x[1], res.x[2]
        )
    )
    return True, int(res.x[3]), res.x[0], res.x[1], res.x[2], res.fun


for optim_iter in range(0, MAX_NUMBER_PEQ):
    # target curve is currently a line between my_freq_reg_min Hz and 20kHz
    # we are optimizing above my_freq_reg_min hz on anechoic data
    auto_target = []
    for i in range(0, my_number_curves):
        auto_target.append(target[i] - target_interp[i] + peq_build(freq, auto_peq))

    # greedy strategy: look for lowest & highest peak
    sign, init_freq, init_freq_range = propose_range_freq(freq, auto_target[0])
    init_dbGain_range = propose_range_dbGain(freq, auto_target[0], sign, init_freq)
    init_Q_range = propose_range_Q()
    biquad_range = propose_range_biquad()

    (
        state,
        current_type,
        current_freq,
        current_Q,
        current_dbGain,
        current_loss,
    ) = find_best_biquad_differential_evolution(
        auto_target,
        init_freq_range,
        init_Q_range,
        init_dbGain_range,
        biquad_range,
        optim_iter,
    )
    if state:
        biquad = (
            1.0,
            Biquad(current_type, current_freq, 48000, current_Q, current_dbGain),
        )
        auto_peq.append(biquad)
        fixed_freq.add((sign, current_freq))
        best_loss = current_loss
        pref_score = score_loss(df_speaker, auto_peq)
        logger.info(
            "Iter {:2d} Optim converged loss {:2.2f} pref score {:2.2f} biquad {:1d} F:{:5.0f}Hz Q:{:2.2f} G:{:+2.2f}dB".format(
                optim_iter,
                best_loss,
                pref_score,
                current_type,
                current_freq,
                current_Q,
                current_dbGain,
            )
        )
    else:
        logger.error("Skip failed optim for best {} current".format(best_loss, current_loss))
        break

auto_target = []
for i in range(0, my_number_curves):
    auto_target.append(target[i] - target_interp[i] + peq_build(freq, auto_peq))
logger.info(
    "OPTIM END: best loss {} with {} PEQs".format(flat_loss(auto_target, [], 0), len(auto_peq))
)

In [None]:
# log results
print("Manual QE")
peq_print(manual_peq)

score = scores(df_speaker)
spin_manual, pir_manual, score_manual = scores_apply_filter(df_speaker, manual_peq)
spin_auto, pir_auto, score_auto = scores_apply_filter(df_speaker, auto_peq)

scores_print(score, score_manual)
scores_print(score, score_auto)

In [None]:
df_eq = pd.DataFrame({"Freq": freq})
for i, (pos, eq) in enumerate(manual_peq):
    df_eq["EQ {}".format(i)] = peq_build(freq, [(pos, eq)])

g_eq = (
    alt.Chart(graph_melt(df_eq))
    .mark_line()
    .encode(
        alt.X(
            "Freq:Q",
            title="Freq (Hz)",
            scale=alt.Scale(type="log", nice=False, domain=[my_freq_reg_min, 20000]),
        ),
        alt.Y(
            "dB:Q",
            title="Sound Pressure (dB)",
            scale=alt.Scale(zero=False, domain=[-12, 12]),
        ),
        alt.Color("Measurements", type="nominal", sort=None),
    )
    .properties(width=800, height=400, title="{} manual EQ".format(speaker_name))
)

df_auto = pd.DataFrame({"Freq": freq})
for i, (pos, eq) in enumerate(auto_peq):
    df_auto["EQ {}".format(i)] = peq_build(freq, [(pos, eq)])

g_auto = (
    alt.Chart(graph_melt(df_auto))
    .mark_line()
    .encode(
        alt.X(
            "Freq:Q",
            title="Freq (Hz)",
            scale=alt.Scale(type="log", nice=False, domain=[my_freq_reg_min, 20000]),
        ),
        alt.Y(
            "dB:Q",
            title="Sound Pressure (dB)",
            scale=alt.Scale(zero=False, domain=[-12, 12]),
        ),
        alt.Color("Measurements", type="nominal", sort=None),
    )
    .properties(width=800, height=400, title="{} auto EQ".format(speaker_name))
)

g_eq_full = (
    alt.Chart(
        graph_melt(
            pd.DataFrame(
                {
                    "Freq": freq,
                    "Manual": peq_build(freq, manual_peq),
                    "Auto": peq_build(freq, auto_peq),
                }
            )
        )
    )
    .mark_line()
    .encode(
        alt.X(
            "Freq:Q",
            title="Freq (Hz)",
            scale=alt.Scale(type="log", nice=False, domain=[my_freq_reg_min, 20000]),
        ),
        alt.Y(
            "dB:Q",
            title="Sound Pressure (dB)",
            scale=alt.Scale(zero=False, domain=[-5, 5]),
        ),
        alt.Color("Measurements", type="nominal", sort=None),
    )
    .properties(width=800, height=400, title="{} manual and auto filter".format(speaker_name))
)

g_optim = (
    alt.Chart(
        graph_melt(
            pd.DataFrame(
                {
                    "Freq": freq,
                    #'LW-Reg': lw-lw_interp,
                    "Manual": lw_eq[0] - lw_eq_interp[0],
                    "Auto": target[0] - target_interp[0] + peq_build(freq, auto_peq),
                }
            )
        )
    )
    .mark_line()
    .encode(
        alt.X(
            "Freq:Q",
            title="Freq (Hz)",
            scale=alt.Scale(type="log", nice=False, domain=[my_freq_reg_min, 20000]),
        ),
        alt.Y(
            "dB:Q",
            title="Sound Pressure (dB)",
            scale=alt.Scale(zero=False, domain=[-5, 5]),
        ),
        alt.Color("Measurements", type="nominal", sort=None),
    )
    .properties(
        width=800,
        height=400,
        title="{} manual and auto corrected {}".format(speaker_name, curve_name[0]),
    )
)

((g_eq | g_auto) & (g_eq_full | g_optim)).resolve_scale("independent")

In [None]:
zero = target[0][0]
alt.Chart(
    graph_melt(
        pd.DataFrame(
            {
                "Freq": freq,
                "lw": target[0] - zero,
                "lw interp": target_interp[0] - zero,
                "manual": lw_eq[0] - lw_eq[0][0] - 4,
                "manual interp": lw_eq_interp[0] - lw_eq[0][0] - 4,
                "auto": target[0] - zero + peq_build(freq, auto_peq) - 8,
                "auto interp": target_interp[0] - zero - 8,
            }
        )
    )
).mark_line(size=3).encode(
    alt.X(
        "Freq:Q",
        title="Freq (Hz)",
        scale=alt.Scale(type="log", nice=False, domain=[my_freq_reg_min, 20000]),
    ),
    alt.Y(
        "dB:Q",
        title="Sound Pressure (dB)",
        scale=alt.Scale(zero=False, domain=[-20, 0]),
    ),
    alt.Color("Measurements", type="nominal", sort=None),
).properties(width=800, height=400, title=speaker_name)

In [None]:
g_params = {
    "xmin": 20,
    "xmax": 20000,
    "ymin": -50,
    "ymax": 10,
    "width": 400,
    "height": 250,
}

g_params["ymin"] = -40
g_params["width"] = 800
g_params["height"] = 400
(
    graph_spinorama(df_speaker["CEA2034"], g_params).properties(
        title="{} from ASR".format(speaker_name)
    )
    | graph_spinorama(spin_manual, g_params).properties(
        title="{} ASR + manual EQ".format(speaker_name)
    )
    | graph_spinorama(spin_auto, g_params).properties(title="{} ASR + auto EQ".format(speaker_name))
)

In [None]:
# which_curve='Listening Window'
# which_curve='Sound Power'
which_curve = "Estimated In-Room Response"
data = df_speaker["CEA2034"]
data_manual = spin_manual
data_auto = spin_auto
if which_curve == "Estimated In-Room Response":
    data = df_speaker["Estimated In-Room Response"]
    data_auto = pir_auto
    data_manual = pir_manual
reg = graph_regression(
    data_auto.loc[(data_auto.Measurements == which_curve)], my_freq_reg_min, 20000
)
origin = (graph_freq(data.loc[(data.Measurements == which_curve)], g_params) + reg).properties(
    title="{} from ASR [{}]".format(speaker_name, which_curve)
)
manual = (
    graph_freq(data_manual.loc[(data_manual.Measurements == which_curve)], g_params) + reg
).properties(title="{} from ASR [{}] + manual EQ".format(speaker_name, which_curve))
auto = (
    graph_freq(data_auto.loc[(data_auto.Measurements == which_curve)], g_params) + reg
).properties(title="{} from ASR [{}] + auto EQ".format(speaker_name, which_curve))
(origin | manual | auto)