##### Config

In [None]:
%matplotlib inline

In [None]:
%config InlineBackend.figure_format = "retina"

In [None]:
import matplotlib
import matplotlib.pyplot as plt
import numpy as np

# Disable annoying font warnings
matplotlib.font_manager._log.setLevel(50)

# Disable theano deprecation warnings
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings(
    "ignore", category=matplotlib.MatplotlibDeprecationWarning
)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning, module="theano")

# Style
plt.style.use("default")
plt.rcParams["savefig.dpi"] = 100
plt.rcParams["figure.dpi"] = 100
plt.rcParams["figure.figsize"] = (12, 4)
plt.rcParams["font.size"] = 14
plt.rcParams["text.usetex"] = False
plt.rcParams["font.family"] = "sans-serif"
plt.rcParams["font.sans-serif"] = ["Liberation Sans"]
plt.rcParams["font.cursive"] = ["Liberation Sans"]
try:
    plt.rcParams["mathtext.fallback"] = "cm"
except KeyError:
    plt.rcParams["mathtext.fallback_to_cm"] = True
plt.rcParams["mathtext.fallback_to_cm"] = True

In [None]:
del matplotlib
del plt
del warnings

##### Main

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
import theano
import theano.tensor as tt
from tqdm.auto import tqdm
import starry

starry.config.quiet = True

In [None]:
def NAdam(cost, params, lr=0.002, b1=0.9, b2=0.999, e=1e-8, sd=0.004):
    """https://github.com/keras-team/keras/blob/master/keras/optimizers.py"""
    updates = []
    grads = tt.grad(cost, params)
    i = theano.shared(np.array(0.0, dtype=theano.config.floatX))
    i_t = i + 1.0

    # Warm up
    m_schedule = theano.shared(np.array(1.0, dtype=theano.config.floatX))
    momentum_cache_t = b1 * (1.0 - 0.5 * (tt.pow(0.96, i_t * sd)))
    momentum_cache_t_1 = b1 * (1.0 - 0.5 * (tt.pow(0.96, (i_t + 1) * sd)))
    m_schedule_new = m_schedule * momentum_cache_t
    m_schedule_next = m_schedule * momentum_cache_t * momentum_cache_t_1
    updates.append((m_schedule, m_schedule_new))

    for p, g in zip(params, grads):
        m = theano.shared(p.get_value() * 0.0)
        v = theano.shared(p.get_value() * 0.0)

        g_prime = g / (1.0 - m_schedule_new)
        m_t = b1 * m + (1.0 - b1) * g
        m_t_prime = m_t / (1.0 - m_schedule_next)
        v_t = b2 * v + (1.0 - b2) * tt.sqr(g)
        v_t_prime = v_t / (1.0 - tt.pow(b2, i_t))
        m_t_bar = (1.0 - momentum_cache_t) * g_prime + (
            momentum_cache_t_1 * m_t_prime
        )

        updates.append((m, m_t))
        updates.append((v, v_t))

        p_t = p - lr * m_t_bar / (tt.sqrt(v_t_prime) + e)
        new_p = p_t
        updates.append((p, new_p))

    updates.append((i, i_t))

    return updates

## One component

Generate the spectra:

In [None]:
map = starry.DopplerMap(15, lazy=False, inc=60, veq=50000, nt=60)
map.load("spot")
ytru = map.amp * np.array(map.y)
D = map.design_matrix(fix_spectrum=True)
B = map._map.design_matrix(theta=np.linspace(0, 360, map.nt, endpoint=False))
B = np.repeat(B, map.nw, axis=0)
f = (D @ ytru) / (B @ ytru)

Solve the approximate linear problem:

In [None]:
eps = 5e-4
D0, D1 = D[:, 0], D[:, 1:]
B0, B1 = B[:, 0], B[:, 1:]
b = D0
A = D1 - D0.reshape(-1, 1) * B1
y1 = np.linalg.solve(A.T @ A + eps * np.eye(A.shape[1]), A.T @ (f - b))
y = np.append(1.0, y1)
model = (D @ y) / (B @ y)

Here's what we get:

In [None]:
fig = plt.figure(constrained_layout=True, figsize=(12, 8))
ax = fig.subplot_mosaic(
    """
    DDEE
    AAAA
    BBCC
    """
)
ax["A"].plot(f)
ax["A"].plot(model)
ax["A"].set_xticks([])
ax["A"].set_ylabel("spectrum")

ax["B"].plot(ytru[1:])
ax["B"].plot(y[1:])
ax["B"].set_xticks([])
ax["B"].set_ylabel("ylm coeffs")

ax["C"].plot(B @ ytru)
axt = ax["C"].twinx()
axt.plot(B @ y, "C1")
ax["C"].set_xticks([])
ax["C"].set_ylabel("baseline")

map._map[:, :] = ytru
map._map.show(ax=ax["D"], projection="moll")
ax["D"].axis("on")
ax["D"].set_title("true")
ax["D"].set_yticks([])
ax["D"].set_xticks([])
for s in ["top", "right", "bottom", "left"]:
    ax["D"].spines[s].set_visible(False)
map._map[:, :] = y
map._map.show(ax=ax["E"], projection="moll")
ax["E"].axis("on")
ax["E"].set_title("inferred")
ax["E"].set_yticks([])
ax["E"].set_xticks([])
for s in ["top", "right", "bottom", "left"]:
    ax["E"].spines[s].set_visible(False)

Run nonlinear optimizer a bit:

In [None]:
eps = 5e-4
D0, D1 = D[:, 0], D[:, 1:]
B0, B1 = B[:, 0], B[:, 1:]
b = D0
A = D1 - D0.reshape(-1, 1) * B1
y1 = np.linalg.solve(A.T @ A + eps * np.eye(A.shape[1]), A.T @ (f - b))
y = np.append(1.0, y1)
model = (D @ y) / (B @ y)

y1_ = theano.shared(y1)
y_ = tt.concatenate([tt.as_tensor_variable([1.0]), y1_])
model_ = tt.dot(D, y_) / tt.dot(B, y_)
loss_ = tt.sum((f - model_) ** 2)
loss_ += tt.sum(y1_ ** 2 / (5e1) ** 2)

niter = 1000
best_loss = np.inf
best_y1 = y1
loss = np.zeros(niter)
upd = NAdam(loss_, [y1_], lr=0.0002)
train = theano.function([], [y1_, loss_], updates=upd)
for n in tqdm(range(niter)):
    y1, loss[n] = train()
    if loss[n] < best_loss:
        best_loss = loss[n]
        best_y1 = y1

print(best_loss)
plt.plot(np.log10(loss));

In [None]:
y1 = best_y1
y = np.append(1.0, y1)
model = (D @ y) / (B @ y)

In [None]:
fig = plt.figure(constrained_layout=True, figsize=(12, 8))
ax = fig.subplot_mosaic(
    """
    DDEE
    AAAA
    BBCC
    """
)
ax["A"].plot(f)
ax["A"].plot(model)
ax["A"].set_xticks([])
ax["A"].set_ylabel("spectrum")

ax["B"].plot(ytru[1:])
ax["B"].plot(y[1:])
ax["B"].set_xticks([])
ax["B"].set_ylabel("ylm coeffs")

ax["C"].plot(B @ ytru)
axt = ax["C"].twinx()
axt.plot(B @ y, "C1")
ax["C"].set_xticks([])
ax["C"].set_ylabel("baseline")

map._map[:, :] = ytru
map._map.show(ax=ax["D"], projection="moll")
ax["D"].axis("on")
ax["D"].set_title("true")
ax["D"].set_yticks([])
ax["D"].set_xticks([])
for s in ["top", "right", "bottom", "left"]:
    ax["D"].spines[s].set_visible(False)
map._map[:, :] = y
map._map.show(ax=ax["E"], projection="moll")
ax["E"].axis("on")
ax["E"].set_title("inferred")
ax["E"].set_yticks([])
ax["E"].set_xticks([])
for s in ["top", "right", "bottom", "left"]:
    ax["E"].spines[s].set_visible(False)

## Two components, one uniform

In [None]:
map = starry.DopplerMap(
    15, lazy=False, inc=60, veq=50000, nt=20, nc=2, oversample=5
)
map.load(["s", "o"])
map.amp = 0.5, 0.5
np.random.seed(0)
mu = np.random.uniform(low=map.wav[0], high=map.wav[-1], size=map.nc)
sig = 0.025
dw = map.wav0.reshape(1, -1) - mu.reshape(-1, 1)
map.spectrum = 1.0 - np.exp(-0.5 * dw ** 2 / sig ** 2)
ytru = (map.amp.reshape(-1, 1) * np.array(map.y.T)).reshape(-1)
D = map.design_matrix(fix_spectrum=True)
B = map._map.design_matrix(theta=np.linspace(0, 360, map.nt, endpoint=False))
B = np.tile(B, [1, map.nc])
B = np.repeat(B, map.nw, axis=0)
f = (D @ ytru) / (B @ ytru)

In [None]:
map.show()

In [None]:
plt.plot(f);

In [None]:
eps1 = 1e-2

idx = np.zeros(map.nc * map.Ny, dtype=bool)
idx[0 :: map.Ny] = 1

D0 = D[:, idx]
D1 = D[:, ~idx]
B0 = B[:, idx]
B1 = B[:, ~idx]

y0 = np.ones(map.nc) / map.nc
b = 2 * D0 @ y0 - (D0 @ y0) * (B0 @ y0)
A = 2 * D1 - (D0 @ y0).reshape(-1, 1) * B1 - (B0 @ y0).reshape(-1, 1) * D1
y1 = np.linalg.solve(A.T @ A + eps1 * np.eye(A.shape[1]), A.T @ (f - b))

y = np.zeros(map.nc * map.Ny)
y[idx] = y0
y[~idx] = y1

plt.plot(ytru)
plt.plot(y)

In [None]:
map._map[:, :] = y.reshape(2, -1)[0]
map._map.show(projection="moll")

In [None]:
map._map[:, :] = y.reshape(2, -1)[1]
map._map.show(projection="moll")

In [None]:
# Guesses and regularization
y0 = np.ones(map.nc) / map.nc
eps1 = 1e-2
eps0 = 0
niter = 100

# Iterate
err = np.zeros(niter)
best_err = np.inf
best_y = np.zeros(map.nc * map.Ny)
best_model = np.zeros_like(f)

idx = np.zeros(map.nc * map.Ny, dtype=bool)
idx[0 :: map.Ny] = 1

D0 = D[:, idx]
D1 = D[:, ~idx]
B0 = B[:, idx]
B1 = B[:, ~idx]

n = 0
while n < niter:

    # Solve for y1
    # fp = A @ y1 + b
    b = 2 * D0 @ y0 - (D0 @ y0) * (B0 @ y0)
    A = 2 * D1 - (D0 @ y0).reshape(-1, 1) * B1 - (B0 @ y0).reshape(-1, 1) * D1
    y1 = np.linalg.solve(A.T @ A + eps1 * np.eye(A.shape[1]), A.T @ (f - b))

    y = np.zeros(map.nc * map.Ny)
    y[idx] = y0
    y[~idx] = y1

    model = (D @ y) / (B @ y)
    err[n] = np.sum((f - model) ** 2)
    if err[n] < best_err:
        best_err = err[n]
        best_y = y
        best_model = model
    n += 1

    # Solve for y0
    # fp = A @ y0 + b
    A = D0 - (B1 @ y1).reshape(-1, 1) * D0 - (D1 @ y1).reshape(-1, 1) * B0
    b = 2 * D1 @ y1
    y0 = np.linalg.solve(A.T @ A + eps0 * np.eye(A.shape[1]), A.T @ (f - b))

    y = np.zeros(map.nc * map.Ny)
    y[idx] = y0
    y[~idx] = y1

    model = (D @ y) / (B @ y)
    err[n] = np.sum((f - model) ** 2)
    if err[n] < best_err:
        best_err = err[n]
        best_y = y
        best_model = model
    n += 1

In [None]:
plt.plot(err)

In [None]:
plt.plot(f)
plt.plot(best_model)

In [None]:
plt.plot(ytru)
plt.plot(best_y);

In [None]:
map._map[:, :] = y.reshape(2, -1)[0]
map._map.show(projection="moll")

In [None]:
map._map[:, :] = y.reshape(2, -1)[1]
map._map.show(projection="moll")

## one component

In [None]:
map = starry.DopplerMap(15, lazy=False, inc=60, veq=50000, nt=20)
map.load("spot")
map.amp = 1
ytru = map.amp * np.array(map.y)

D = map.design_matrix(fix_spectrum=True)

B = map._map.design_matrix(theta=np.linspace(0, 360, map.nt, endpoint=False))
B = np.repeat(B, map.nw, axis=0)

f = (D @ ytru) / (B @ ytru)

# Linearization:
# f ~ 2 * D0 * y0 - (D0 * y0) * (B0 * y0) - (D0 * y0) * (B1 @ y1) + 2 * D1 @ y1 - (D1 @ y1) * (B0 * y0)

In [None]:
plt.plot(f)

In [None]:
# Guesses and regularization
y0 = 1.0
eps = 1e-4
niter = 100

# Iterate
err = np.zeros(niter)
best_err = np.inf
best_y = np.zeros(map.Ny)
best_model = np.zeros_like(f)
D0, D1 = D[:, 0], D[:, 1:]
B0, B1 = B[:, 0], B[:, 1:]
n = 0
while n < niter:

    # In terms of y1
    # fp = A @ y1 + b
    b = (2 * D0) * y0 - (D0 * B0) * (y0 ** 2)
    A = (
        2 * D1
        - (D0 * y0).reshape(-1, 1) * B1
        - ((B0 * y0).reshape(-1, 1) * D1)
    )
    y1 = np.linalg.solve(A.T @ A + eps * np.eye(A.shape[1]), A.T @ (f - b))
    y = np.append(y0, y1)

    model = (D @ y) / (B @ y)
    err[n] = np.sum((f - model) ** 2)
    if err[n] < best_err:
        best_err = err[n]
        best_y = y
        best_model = model
    n += 1

    # In terms of y0
    # fp = a * y0 ** 2 + b * y0 + c
    a = -(D0 * B0)
    b = 2 * D0 - B1 @ y1 * D0 - D1 @ y1 * B0
    c = 2 * D1 @ y1
    y0 = np.nanmean((-b + np.sqrt(b ** 2 - 4 * a * (c - f))) / (2 * a))
    y = np.append(y0, y1)

    model = (D @ y) / (B @ y)
    err[n] = np.sum((f - model) ** 2)
    if err[n] < best_err:
        best_err = err[n]
        best_y = y
        best_model = model
    n += 1

In [None]:
plt.plot(err)

In [None]:
plt.plot(f)
plt.plot(best_model)

In [None]:
plt.plot(ytru)
plt.plot(best_y)