::::
:::{thebe-button}
:::
::::

# Get thermal data


In [None]:
from functools import cache

from boilercore.fits import Fit, fit_from_params
from boilercore.models.geometry import GEOMETRY
from boilercore.paths import ISOLIKE, dt_fromisolike
from devtools import pprint
from matplotlib.pyplot import subplots
from pandas import DataFrame, Series, concat, read_csv
from seaborn import lineplot, scatterplot

from boilercv_docs.nbs import display_markdown, init
from boilercv_pipeline.stages.common.e230920 import Col, const
from boilercv_pipeline.stages.common.e230920.types import DfNbOuts
from boilercv_pipeline.stages.find_objects import FindObjects
from boilercv_pipeline.stages.get_thermal_data import GetThermalData as Params

PARAMS = None
C = const.thermal_cols

VIDEO = "Video"
FIT = Fit()
M_TO_CM = 1e-2
M2_TO_CM2 = M_TO_CM**2
WINDOW = 60

In [None]:
if isinstance(PARAMS, str):
    params = Params.model_validate_json(PARAMS)
else:
    params = Params(context=(_ctx := init()))
context = params.context
outs = DfNbOuts()
params.format.set_display_options()
models = FIT.get_models(params.deps.modelfunctions)[0]


def preview(
    df: DataFrame, cols: list[Col] = C.dests, index: Col = C.time_elapsed, head: int = 5
) -> DataFrame:
    """Preview dataframes in this notebook."""
    display_markdown(
        df.drop(columns=C.time())
        .set_index(index())[[c() for c in cols if c not in [C.time, index]]]
        .head(head),
        floatfmt=params.format.floatfmt,
    )
    return df


@cache
def fit(ser: tuple[float, ...]) -> tuple[dict[str, float], dict[str, float]]:
    """Fit for each set of temperatures."""
    return fit_from_params(models, FIT, GEOMETRY.rods["R"], ser)


def apply_fit(df: DataFrame) -> "Series[float]":
    """Fit model function across sample temperatures."""
    return df.loc[:, [c() for c in C.sample_temps]].apply(
        lambda ser: fit(tuple(ser))[0]["q_s"] * C.flux.scale, axis="columns"
    )


pprint(params)

In [None]:
sources = [c.src for c in C.sources]
src = preview(
    concat([
        read_csv(
            p,
            usecols=[C.time.src, *sources],
            parse_dates=[C.time.src],
            index_col=C.time.src,
        )
        for p in params.deps.thermal_paths
    ])
    .reset_index()
    .rename(columns={c.src: c() for c in C.sources})
    .assign(**{
        C.time_elapsed(): lambda df: (
            (df[C.time()] - df[C.time()][0]).dt.total_seconds()
        )
    }),
    cols=[c for c in C.sources if c not in C.sample_temps[2:]],
)

In [None]:
only_dests = [c for c in C.dests if c not in [*C.sources, C.boiling]]
outs.df = preview(
    src.set_index(C.time())
    .resample("s")
    .mean()
    .reset_index()
    .ffill()
    .assign(**{
        C.time_elapsed(): lambda df: (
            (df[C.time()] - df[C.time()][0]).dt.total_seconds() * C.time_elapsed.scale
        ),
        C.water_temp(): lambda df: df[[c() for c in C.water_temps]].mean(
            axis="columns"
        ),
        C.boiling(): lambda df: df[C.water_temp()].max(),
        C.superheat(): lambda df: df[C.surface_temp()] - df[C.boiling()],
        C.subcool(): lambda df: df[C.boiling()] - df[C.water_temp()],
        C.flux(): apply_fit,
    })
    .loc[:, [c() for c in C.dests]],
    cols=only_dests,
)

In [None]:
df = preview(
    outs.df.assign(**{
        C.time_elapsed_min(): lambda df: df[C.time_elapsed()] * C.time_elapsed_min.scale
    })
    .set_index(C.time())
    .resample("20s")
    .median()
    .assign(**{
        f"{VIDEO}_subcool": lambda df: df.loc[
            df.index[
                df.index.get_indexer(
                    [
                        dt_fromisolike(match)
                        for p in FindObjects(context=context).contours
                        if (match := ISOLIKE.search(p.stem))
                    ],
                    method="nearest",
                ),
            ],
            C.subcool(),
        ],
        f"{VIDEO}_superheat": lambda df: df.loc[
            df.index[
                df.index.get_indexer(
                    [
                        dt_fromisolike(match)
                        for p in FindObjects(context=context).contours
                        if (match := ISOLIKE.search(p.stem))
                    ],
                    method="nearest",
                ),
            ],
            C.superheat(),
        ],
        f"{VIDEO}_flux": lambda df: df.loc[
            df.index[
                df.index.get_indexer(
                    [
                        dt_fromisolike(match)
                        for p in FindObjects(context=context).contours
                        if (match := ISOLIKE.search(p.stem))
                    ],
                    method="nearest",
                ),
            ],
            C.flux(),
        ],
    })
    .reset_index(),
    index=C.time_elapsed_min,
    cols=[c for c in only_dests if c != C.time_elapsed],
)

In [None]:
fig, ax = subplots()
outs.plots.append(fig)
lineplot(
    ax=ax,
    zorder=0,
    data=df.set_index(C.time_elapsed_min())[[C.subcool(), C.superheat()]],
    dashes=False,
    errorbar=None,
)
scatterplot(
    ax=ax,
    s=10 * params.format.marker_scale,
    markers={VIDEO: "*"},
    palette={VIDEO: "red"},
    data=df.set_index(C.time_elapsed_min()).rename(columns={f"{VIDEO}_subcool": VIDEO})[
        [VIDEO]
    ],
)
scatterplot(
    ax=ax,
    s=10 * params.format.marker_scale,
    markers={VIDEO: "*"},
    palette={VIDEO: "red"},
    data=df.set_index(C.time_elapsed_min()).rename(
        columns={f"{VIDEO}_superheat": VIDEO}
    )[[VIDEO]],
    legend=False,
)
ax.set_ylabel(C.subcool.ylabel)
params.format.move_legend(ax)

In [None]:
fig, ax = subplots()
outs.plots.append(fig)
scatterplot(ax=ax, data=df, x=C.subcool(), y=C.flux(), hue=C.time_elapsed_min())
scatterplot(
    ax=ax,
    s=10 * params.format.marker_scale,
    markers={VIDEO: "*"},
    palette={VIDEO: "red"},
    data=df.set_index(f"{VIDEO}_subcool").rename(columns={f"{VIDEO}_flux": VIDEO})[
        [VIDEO]
    ],
)
ax.get_legend().set_title(C.time_elapsed_min())  # pyright: ignore[reportOptionalMemberAccess]
params.format.move_legend(ax, ncol=4)

In [None]:
fig, ax = subplots()
outs.plots.append(fig)
scatterplot(ax=ax, data=df, x=C.superheat(), y=C.flux(), hue=C.time_elapsed_min())
scatterplot(
    ax=ax,
    s=10 * params.format.marker_scale,
    markers={VIDEO: "*"},
    palette={VIDEO: "red"},
    data=df.set_index(f"{VIDEO}_superheat").rename(columns={f"{VIDEO}_flux": VIDEO})[
        [VIDEO]
    ],
)
ax.get_legend().set_title(C.time_elapsed_min())  # pyright: ignore[reportOptionalMemberAccess]
params.format.move_legend(ax)