In [None]:
import functools
import itertools
import random

import numpy as np
import numpy.typing as npt
import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio
import tensorflow as tf

import scdc

pio.renderers.default = "notebook+svg"

In [None]:
# Create a dataset
tau = np.linspace(0.1, 0.5, 9)
t0 = np.linspace(0.0, 0.5, 11)
y0 = np.linspace(0.0, 0.5, 11)
N = 10
seed = 0

create_data = functools.partial(scdc.create_decay_curve, size=480)
random_state = np.random.RandomState(seed)
add_noise = functools.partial(scdc.add_noise, random_state=random_state)
dataset = list(
    itertools.starmap(
        create_data,
        itertools.product(tau, t0, y0),
    ),
)
dataset = list(map(add_noise, dataset * N))
print(f"Size of dataset: {len(dataset)}")

In [None]:
# Build a model


def get_input(dc: scdc.DecayCurve) -> npt.NDArray[np.float64]:
    return dc.y


def get_output(dc: scdc.DecayCurve) -> npt.NDArray[np.float64]:
    return np.array([dc.t0, dc.y0])


input_size = get_input(dataset[0]).size
output_size = get_output(dataset[0]).size
model = tf.keras.Sequential(
    [
        tf.keras.layers.Input((input_size,)),
        tf.keras.layers.Dense(200, activation="tanh"),
        tf.keras.layers.Dense(120, activation="tanh"),
        tf.keras.layers.Dense(30, activation="tanh"),
        tf.keras.layers.Dense(output_size, activation="sigmoid"),
    ]
)
model.compile(
    loss="mse",
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    metrics=["mse", "mae"],
)
model.summary()

In [None]:
def schedule_learning_rate(epoch: int) -> float:
    return 1e-4 * 10 ** (-epoch // 200)


history = model.fit(
    np.array(list(map(get_input, dataset))),
    np.array(list(map(get_output, dataset))),
    epochs=1000,
    validation_split=0.2,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=20),
        tf.keras.callbacks.LearningRateScheduler(schedule_learning_rate),
    ],
    verbose=0,
)

hist = pd.DataFrame(history.history).assign(epoch=history.epoch)
go.Figure(
    layout=go.Layout(
        width=640,
    ),
).add_scatter(
    x=hist["epoch"],
    y=hist["mse"],
    name="mse",
).add_scatter(
    x=hist["epoch"],
    y=hist["val_mse"],
    name="val_mse",
).update_yaxes(
    title="Mean Squared Error",
    type="log",
).update_xaxes(
    title="Epoch",
).show()

In [None]:
test_dataset_size = 100
test_dataset = list(
    itertools.starmap(
        create_data,
        zip(
            np.random.uniform(0.1, 0.5, test_dataset_size),
            np.random.uniform(0.0, 0.5, test_dataset_size),
            np.random.uniform(0.0, 0.5, test_dataset_size),
        ),
    )
)
test_data = np.array(list(map(get_input, test_dataset)))
test_label = np.array(list(map(get_output, test_dataset)))
loss, mse, mae = model.evaluate(test_data, test_label)


test_label_t = np.transpose(test_label)
prediction = model.predict(test_data, verbose=0)
prediction_t = np.transpose(prediction)
for i, (x, y) in enumerate(zip(test_label_t, prediction_t)):
    residual = y - x
    fig = (
        go.Figure(
            layout=go.Layout(
                width=480,
                height=480,
                showlegend=False,
            ),
        )
        .add_scatter(
            x=x,
            y=residual,
            mode="markers",
        )
        .update_xaxes(
            title=f"variable_{i}",
        )
        .update_yaxes(
            title="Residual",
        )
    )
    fig.show()

In [None]:
for data in random.sample(list(test_data), 5):
    prediction, *_ = model.predict(np.expand_dims(data, 0), verbose=0)
    d = (1.0 - prediction[1]) * 0.1
    go.Figure(
        layout=go.Layout(
            width=640,
            showlegend=False,
        ),
    ).add_scatter(
        y=data,
    ).update_yaxes(
        range=(prediction[1] - d, 1 + d),
    ).add_vline(
        prediction[0] * len(data),
    ).add_hline(
        prediction[1],
    ).show()