In [None]:
%matplotlib widget

import gzip
from dataclasses import dataclass
from functools import partial
from os import listdir, path
from typing import Callable, Optional, SupportsFloat, Tuple, Union

import msgpack
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from scipy import interpolate


In [None]:
STICKING_PROBABILITIES = [1.0, 0.8, 0.6, 0.4, 0.2, 0.1]
DATA_DIR = "/run/media/life/barry/ViennaTools/spherical"
fnames = listdir(DATA_DIR)


In [None]:
DataLoadingFunction = Callable[[str], dict]
NumpyOrFloat = Union[float, np.ndarray]


@dataclass(frozen=True, slots=True)
class ThicknessData:
    thickness: list[list[np.ndarray]]
    sticking_probabilities: list[float]
    max_times: list[int]
    num_points: int


def map_configs(fnames: list[str]) -> dict[str, list[str]]:
    configs = {}
    for fname in fnames:
        sticking, geom = fname[:5], fname[6:]
        if not geom in configs:
            configs[geom] = []
        configs[geom].append(sticking)
    return configs


def load_data(data_dir: str, filename: str) -> dict:
    with gzip.open(path.join(data_dir, filename)) as gz:
        data = msgpack.unpack(gz, use_list=False, raw=False)
    return data["nodes"]


def extract_data(
    loader: DataLoadingFunction, name: str, sticking: list[str]
) -> ThicknessData:
    data_list: list[list[np.ndarray]] = []
    sticking_probabilities: list[float] = []
    max_times: list[int] = []
    num_points: list[int] = []
    for ps in sticking:
        node_data = loader(f"{ps}_{name}")
        labels = [s for s in node_data if s.startswith("extracted_")]
        data_list.append([np.array(node_data[l]) for l in labels])
        sticking_probabilities.append(float(ps[1:]) / 1000)
        max_times.append(len(labels))
        num_points.extend([d.shape[0] for d in data_list[-1]])

    # Ensure that the number of nodes is the same for all data
    assert np.all(np.array(num_points) == num_points[0])

    # Construct the data container instance
    return ThicknessData(
        thickness=data_list,
        sticking_probabilities=sticking_probabilities,
        max_times=max_times,
        num_points=num_points[0],
    )


def to_data_tensor(data: ThicknessData) -> np.ndarray:
    mapped = np.zeros((data.num_points, len(STICKING_PROBABILITIES), 49))
    for i, s in enumerate(STICKING_PROBABILITIES):
        # First, determine the index
        data_index = 0
        for sp in data.sticking_probabilities:
            if sp == s:
                break
            data_index = data_index + 1
        else:
            print("Sticking coefficient not found!")
            continue
        mapped[:, i, : data.max_times[data_index]] = np.array(
            data.thickness[data_index]
        ).T
    return mapped


def plot_data(
    mapped: np.ndarray,
    sticking_probabilities: list[float],
    point_index: int,
    normalize: bool = False,
    log: bool = False,
) -> None:
    _, ax = plt.subplots(nrows=1, ncols=2, squeeze=False, figsize=(10, 4))
    if normalize:
        for i, r in enumerate(np.moveaxis(mapped[point_index, :, :], 0, 1)):
            ax[0, 0].plot(
                sticking_probabilities,
                np.where(r != 0, r / (i + 1), np.nan),
                marker="x",
            )

        for i, r in enumerate(np.moveaxis(mapped[point_index, :, :], 0, 0)):
            ax[0, 1].plot(
                np.where(r != 0, r / (1 + np.arange(49)), np.nan),
                marker="x",
                label=sticking_probabilities[i],
            )
    else:
        for i, r in enumerate(np.moveaxis(mapped[point_index, :, :], 0, 1)):
            ax[0, 0].plot(
                sticking_probabilities, np.where(r != 0, r, np.nan), marker="x"
            )

        for i, r in enumerate(np.moveaxis(mapped[point_index, :, :], 0, 0)):
            ax[0, 1].plot(
                np.where(r != 0, r, np.nan),
                marker="x",
                label=sticking_probabilities[i],
            )

    ax[0, 0].grid(which="both")
    ax[0, 0].set_xlabel("sticking probability")
    ax[0, 0].set_ylabel("radius")
    ax[0, 1].set_xlabel("time")
    ax[0, 1].set_ylabel("radius")
    ax[0, 1].grid(which="both")

    if log:
        ax[0, 0].set_yscale("log")
        ax[0, 1].set_yscale("log")
    ax[0, 1].legend()
    plt.show()


def to_grid_data(
    z: np.ndarray,
    sticking_probabilities: list[float],
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Converts the provided data to"""
    y = sticking_probabilities
    x = np.arange(z.shape[1])
    xx, yy = np.meshgrid(x, y)
    zz = np.where(z > 0, z, np.nan)
    return xx, yy, zz


def plot_data_3d(
    mapped: np.ndarray,
    sticking_probabilities: list[float],
    point_index: int,
    interp: bool = False,
    fill: bool = False,
    fit_function: Optional[
        Callable[
            [NumpyOrFloat, NumpyOrFloat],
            NumpyOrFloat,
        ]
    ] = None,
) -> Tuple[Figure, Axes]:
    """Creates a 3D surface plot of the provided data"""
    xx, yy, zz = to_grid_data(mapped[point_index, :, :], sticking_probabilities)

    cmap = plt.get_cmap("coolwarm")
    fig = plt.figure()
    ax = fig.add_subplot(projection="3d")

    if fill:
        zz = fill_along_axis(zz, np.isnan(zz), 0)

    if interp:
        xnew = np.linspace(0, zz.shape[1], 100)
        ynew = np.linspace(
            np.min(sticking_probabilities), np.max(sticking_probabilities), 100
        )
        xxnew, yynew = np.meshgrid(xnew, ynew)

        zznew = interpolate.griddata(
            (xx.ravel(), yy.ravel()),
            zz.ravel(),
            (xxnew.ravel(), yynew.ravel()),
            method="linear",
        )
        ax.plot_surface(
            xxnew,
            yynew,
            zznew.reshape(xxnew.shape),
            linewidth=2,
            cmap=cmap,
            rstride=1,
            cstride=1,
            antialiased=True,
        )
    else:
        ax.plot_surface(xx, yy, zz, linewidth=1, cmap=cmap, antialiased=True)
        # np.save("data.npy", (xx, yy, zz))

    if fit_function:
        xnew = np.linspace(0, zz.shape[1], 100)
        ynew = np.linspace(
            np.min(sticking_probabilities), np.max(sticking_probabilities), 100
        )
        xxnew, yynew = np.meshgrid(xnew, ynew)
        zznew = fit_function(xxnew.ravel(), yynew.ravel())
        ax.plot_surface(
            xxnew,
            yynew,
            zznew.reshape(xxnew.shape),
            linewidth=1,
            # cmap=cmap,
            antialiased=True,
        )

    ax.set_xlabel("time")
    ax.set_ylabel("sticking probability")
    ax.set_zlabel("radius")

    return fig, ax


data_loader = partial(load_data, DATA_DIR)


In [None]:
configs = map_configs(fnames)

config_list = list(configs.items())

name, sticking = config_list[12]

print(name)
mapped = to_data_tensor(extract_data(data_loader, name, sticking))



In [None]:
plot_data(mapped, STICKING_PROBABILITIES, 100, normalize=False, log=False)
plot_data(mapped, STICKING_PROBABILITIES, 5, normalize=True, log=True)


In [None]:
def fun(
    X: Union[np.ndarray, Tuple[NumpyOrFloat, NumpyOrFloat]],
    a: float,
    b: float,
    c: float,
    d: float,
    e: float,
    f: float,
    g: float,
) -> NumpyOrFloat:
    x = X[0] / 50
    y = 1 - X[1]
    return a * np.tanh(b * x * (c + d * y)) * (e + f * y**2 + g * y**4)


def fit_surface(
    x: NumpyOrFloat,
    y: NumpyOrFloat,
    a: float,
    b: float,
    c: float,
    d: float,
    e: float,
    f: float,
    g: float,
) -> NumpyOrFloat:
    """x: time, y: sp"""
    # x = x / 50
    # y = 1 - y
    # return a * np.tanh(b * x * (c + d * y)) * (e + f * y**2 + g * y**4)
    t = fun((x, y), a, b, c, d, e, f, g)
    return t


fs = partial(fit_surface, a=7.7, b=5, c=0.5, d=0.5, e=1, f=0, g=1)

fig, ax = plot_data_3d(
    mapped,
    STICKING_PROBABILITIES,
    1000,
    interp=False,
    fill=True,
    # fit_function=fs,
)


In [None]:
from scipy.optimize import curve_fit


def optimize(point_index: int):
    x, y, z = to_grid_data(mapped[point_index, :, :], STICKING_PROBABILITIES)
    z = fill_along_axis(z)

    popt, pcov = curve_fit(fun, (x.ravel(), y.ravel()), z.ravel())
    print(popt)

    fs = partial(
        fit_surface,
        a=popt[0],
        b=popt[1],
        c=popt[2],
        d=popt[3],
        e=popt[4],
        f=popt[5],
        g=popt[6],
    )

    plot_data_3d(
        mapped,
        STICKING_PROBABILITIES,
        point_index,
        interp=False,
        fill=True,
        fit_function=fs,
    )


optimize(100)


In [None]:
def fill_along_axis(
    x: np.ndarray,
    mask: np.ndarray,
    axis: int = 0,
) -> np.ndarray:
    """Fills values of multidimensional array where mask is True using
    (linear) interpolation along one axis."""
    x = x.copy()
    for (sequence, ma) in zip(np.moveaxis(x, 0, axis), np.moveaxis(mask, 0, axis)):
        if ma.sum() != 0 and (~ma).sum():
            sequence[ma] = np.interp(
                np.flatnonzero(ma), np.flatnonzero(~ma), sequence[~ma]
            )
    return x


def check_monotonic(
    x: np.ndarray,
    axis: int = 0,
    eps: float = 0.1,
    increasing: bool = True,
):
    """Checks if the provided array is monotonically increasing or decreasing along the provided axis"""
    assert axis < len(x.shape)
    if increasing:
        monotonic = np.diff(x, 1, axis=axis, prepend=0) > -eps
    else:
        monotonic = np.diff(x, 1, axis=axis, append=0) > eps
    return monotonic


def to_monotonic(x, sticking_probabilities):
    xx, yy, zz = to_grid_data(x, sticking_probabilities)
    zz = fill_along_axis(zz, np.isnan(zz), 0)

    mono_mask = ~check_monotonic(zz, axis=0)

    zz = fill_along_axis(zz, mono_mask, axis=1)

    # Plotting
    _, ax = plt.subplots(1, 1)
    ax.contourf(xx, yy, zz)

    plt.show()


to_monotonic(mapped[1500, :, :], STICKING_PROBABILITIES)


## Genetic Algorithms

In [None]:
from gplearn.genetic import SymbolicRegressor


def regress_gp():
    x, y, z = np.load("data.npy")
    X = np.vstack([x.flatten(), y.flatten()]).T
    Y = z.flatten()
    print(X.shape)
    print(Y.shape)
    est = SymbolicRegressor(
        population_size=5000,
        generations=50,
        stopping_criteria=0.01,
        p_crossover=0.7,
        p_subtree_mutation=0.1,
        p_hoist_mutation=0.05,
        p_point_mutation=0.1,
        max_samples=0.9,
        verbose=1,
        parsimony_coefficient=0.01,
        random_state=0,
    )
    est.fit(X, Y)
    return est


from pysr import PySRRegressor


def regress_pysr():
    x, y, z = np.load("data.npy")
    X = np.vstack([x.flatten(), y.flatten()]).T
    Y = z.flatten()

    model = PySRRegressor(
        model_selection="best",  # Result is mix of simplicity+accuracy
        niterations=40,
        binary_operators=["+", "*"],
        unary_operators=[
            "exp",
            "sigm(x) = 1 / (1+exp(-x))",
            "inv(x) = 1/x",
            # ^ Custom operator (julia syntax)
        ],
        extra_sympy_mappings={
            "inv": lambda x: 1 / x,
            "sigm": lambda x: 1 / (1 + np.exp(-x)),
        },
        # ^ Define operator for SymPy as well
        # loss="loss(x, y) = (x - y)^2",
        # ^ Custom loss function (julia syntax)
    )
    model.fit(X, Y)
    return model


est = regress_pysr()


In [None]:
def plot_est(
    est: SymbolicRegressor,
    mapped: np.ndarray,
    sticking_probabilities: list[float],
    point_index: int,
) -> None:
    cmap = plt.get_cmap("coolwarm")
    fig = plt.figure()
    ax = fig.add_subplot(projection="3d")
    z = mapped[point_index, :, :]

    x = np.linspace(0, z.shape[1], 100)
    y = np.linspace(np.min(sticking_probabilities), np.max(sticking_probabilities), 100)
    xx, yy = np.meshgrid(x, y)

    X = np.vstack([xx.flatten(), yy.flatten()]).T

    zz = est.predict(X)

    ax.plot_surface(
        xx,
        yy,
        zz.reshape(xx.shape),
        linewidth=2,
        cmap=cmap,
        rstride=1,
        cstride=1,
        antialiased=True,
    )

plot_est(est, mapped, STICKING_PROBABILITIES, 100)

In [None]:
print(est._program)