# Oscillator frequency error estimator simulation

Implementation of a Kalman filter for estimating timer frequency and phase error against a reference clock.

In [None]:
%matplotlib widget
import numpy as np
import matplotlib.pyplot as plt

plt.close("all")
# We use overflow intentionally
np.seterr(over="ignore");

Load CSV containing local and remote timestamps and input

In [None]:
data_raw = np.genfromtxt(
    "../../data/sync_temp2.csv",
    dtype=[("local_time", "uint64"), ("ref_time", "uint64")],
    usecols=[0, 1],
    delimiter=",",
)
data = np.empty(
    data_raw.shape, dtype=np.dtype(data_raw.dtype.descr + [("input", "int16")])
)
data["local_time"] = data_raw["local_time"] * 2**32
data["ref_time"] = data_raw["ref_time"] * 2**32
data["input"] = 0

In [None]:
data = np.genfromtxt(
    "../../data/hfclkaudio_ctlr/hfclkaudio.csv",
    dtype=[("local_time", "uint64"), ("ref_time", "uint64"), ("input", "int16")],
    usecols=[0, 1, 3],
    delimiter=",",
)

Function to transform data as if it was generated by a clock at a different frequency.

In [None]:
time_wrap = 2**64


def time_resample(time, scale):
    return (
        (np.unwrap(time.astype(np.float64), period=time_wrap, axis=0) * scale)
        % time_wrap
    ).astype(np.uint64)


def data_resample(data, scale):
    data = data.copy()
    data["local_time"] = time_resample(data["local_time"], scale)
    data["ref_time"] = time_resample(data["ref_time"], scale)
    return data


def clock_divide(data, divider):
    return (
        (np.unwrap(data.astype(np.float64), period=time_wrap, axis=0) / divider)
        % time_wrap
    ).astype(np.uint32)

In [None]:
1.7e19 / time_wrap

Simulate the frequency estimator using an implementation that is as similar as possible to the C version in the firmware. All the matrix operations have been expanded and simplified using sympy (see `freq_est_model.ipynb`)

In [None]:
PHASE_FRAC = np.uint64(1 << 32)


def qu32_32_from_int(ticks: np.uint32) -> np.uint64:
    return np.uint64(ticks) << np.uint8(32)


def phase_add_float(theta: np.uint64, inc: np.float32) -> np.uint64:
    if inc >= 0:
        theta += np.uint64(inc)
    else:
        theta -= np.uint64(-inc)
    return np.uint64(theta)


def phase_diff_signed(a: np.uint64, b: np.uint64) -> np.int64:
    return np.int64(a - b)


def freq_est(data, nominal_freq, k_f, q_theta, q_f, r, p0):
    nominal_freq_2 = np.float32(nominal_freq) * np.float32(nominal_freq)
    q_f = (q_f / nominal_freq_2).astype(np.float32)
    r = (r * nominal_freq_2).astype(np.float32)

    last_time = np.uint64(0)

    t = np.empty(data.shape[0], dtype=np.uint64)
    z = np.empty(data.shape[0], dtype=np.uint64)
    theta = np.empty(data.shape[0], dtype=np.uint64)
    f = np.empty(data.shape[0], dtype=np.float32)
    P = np.eye(2, dtype=np.float32) * p0

    for i, (local_time, ref_time, input) in enumerate(data):
        z[i] = (local_time - ref_time).astype(np.uint64)

        if i == 0:
            t[i] = 0
            theta[i] = z[i]
            f[i] = 0.0
            last_time = local_time
            continue

        dt = (local_time - last_time).astype(np.float32) / PHASE_FRAC
        last_time = local_time
        t[i] = t[i - 1] + dt

        u_scaled = input.astype(np.float32) * k_f
        theta[i] = phase_add_float(theta[i - 1], dt * (f[i - 1] + u_scaled))
        f[i] = f[i - 1] + u_scaled

        dt_p11 = dt * P[1, 1]
        P[0, 0] += dt * (dt * q_theta + P[0, 1] + P[1, 0] + dt_p11)
        P[0, 1] += dt_p11
        P[1, 0] += dt_p11
        P[1, 1] += dt * dt * q_f

        p00_r = P[0, 0] + r
        k0 = P[0, 0] / p00_r
        k1 = P[1, 0] / p00_r

        theta_error = phase_diff_signed(z[i], theta[i])
        theta[i] = phase_add_float(theta[i], k0 * theta_error)
        f[i] += (k1 * theta_error).astype(np.float32)

        P[1, 1] -= P[0, 1] * P[1, 0] / p00_r
        P[0, 1] = r * P[0, 1] / p00_r
        P[0, 0] = r * k0
        P[1, 0] = r * k1

    # Convert internal representations to meaningful units
    t = t.astype(np.float64) / nominal_freq
    z = z.astype(np.float64) / PHASE_FRAC
    theta = theta.astype(np.float64) / PHASE_FRAC
    f /= PHASE_FRAC

    return t, z, theta, f

In [None]:
nominal_freq = np.float32(16e6)
k_f = np.float32(2**32 * 32e6 / (12 * 2**16 * 11289600))
q_theta = np.float32(0)
q_f = np.float32(256)
r = np.float32(390625)
p0 = np.float32(1e6)
t, z, theta, f = freq_est(
    data, nominal_freq=nominal_freq, k_f=k_f, q_theta=q_theta, q_f=q_f, r=r, p0=p0
)

fig, ((ax_freq, ax_theta), (ax_inv, ax_u)) = plt.subplots(
    2, 2, figsize=(10, 8), sharex=True
)
ax_freq.plot(t, f)
ax_freq.set_title("Frequency")
ax_freq.set_ylabel("f")

ax_theta.plot(t, theta - theta[0], t, z - z[0])
ax_theta.set_title("Phase")
ax_theta.set_ylabel("θ")
ax_theta.set_xlabel("t")
ax_theta.legend(("Filtered", "Measured"))

ax_inv.plot(t, theta - z)
ax_inv.set_title("Phase Innovation")
ax_inv.set_ylabel("θ")
ax_inv.set_xlabel("t")

ax_u.plot(t, data["input"])
ax_u.set_title("Input")
ax_u.set_ylabel("θ")
ax_u.set_xlabel("t");

Simulate and plot the filter at the raw frequency and simulated higher frequency. This verifies that the parameter scaling is working as expected.

In [None]:
nominal_freq = np.float32(16e6)
k_f = 0
q_theta = np.float32(0)
q_f = np.float32(256)
r = np.float32(390625)
p0 = np.float32(1e6)
t, z, theta, f = freq_est(
    data, nominal_freq=nominal_freq, q_theta=q_theta, k_f=k_f, q_f=q_f, r=r, p0=p0
)

scale = 2
nominal_freq *= scale
t_d, z_d, theta_d, f_d = freq_est(
    data_resample(data, scale),
    nominal_freq=nominal_freq,
    k_f=k_f,
    q_theta=q_theta,
    q_f=q_f,
    r=r,
    p0=p0,
)

fig, (ax_freq, ax_theta) = plt.subplots(2, 1, figsize=(6, 8), sharex=True)
ax_freq.plot(t, f, t_d, f_d)
ax_freq.set_title("Frequency")
ax_freq.set_ylabel("f")
ax_freq.legend(("Filtered", "Filtered (divided)"))

ax_theta.plot(t, theta - theta[0], t_d, (theta_d - theta_d[0]) / scale, t, z - z[0])
ax_theta.set_title("Phase")
ax_theta.set_ylabel("θ")
ax_theta.set_xlabel("t")
ax_theta.legend(("Filtered", "Filtered (divided)", "Measured"));

Alternative implementation using matrix operations.

In [None]:
def freq_est_matrix(data, nominal_freq, k_f, q_theta, q_f, r, p0):
    nominal_freq_2 = (nominal_freq * nominal_freq).astype(np.float32)
    q_f = (q_f / nominal_freq_2).astype(np.float32)
    r = (r * nominal_freq_2).astype(np.float32)

    Q = np.diag([q_theta, q_f])
    R = np.array([[r]])

    I = np.eye(2)
    C = np.array([[1, 0]])

    last_time = np.uint32(0)
    offset = np.uint32(0)

    t = np.empty(data.shape[0], dtype=np.uint64)
    z = np.empty(data.shape[0], dtype=np.float32)
    x = np.empty((2, data.shape[0]), dtype=np.float64)
    P = np.eye(2, dtype=np.float32) * p0

    for i, (local_time, ref_time, input) in enumerate(data):
        if i == 0:
            t[i] = 0
            z[i] = 0
            offset = (local_time - ref_time).astype(np.uint64)
            x[0, i] = 0.0
            x[1, i] = 0.0
            last_time = local_time
            continue

        z_unsigned = np.uint64(local_time - ref_time - offset)

        if z_unsigned > (time_wrap / 2):
            z[i] = -np.float32(time_wrap - z_unsigned)
        else:
            z[i] = z_unsigned.astype(np.float32)

        dt = (local_time - last_time).astype(np.float32) / PHASE_FRAC
        last_time = local_time
        t[i] = t[i - 1] + dt

        # Predict
        A = np.array([[1, dt], [0, 1]])
        B = np.array([k_f * dt, k_f])
        x[:, i] = A @ x[:, i - 1] + B * input
        P = A @ P @ A.T + Q * dt**2

        # Update
        K = P @ C.T / (C @ P @ C.T + R)
        x[:, i] += K @ (z[i] - C @ x[:, i])
        P = (I - K @ C) @ P @ (I - K @ C).T + K @ R @ K.T

    # Convert internal representations to meaningful units
    t = t.astype(np.float64) / nominal_freq
    z = z.astype(np.float64) / PHASE_FRAC
    x /= PHASE_FRAC

    return t, z, x[0, :], x[1, :]

In [None]:
nominal_freq = np.float32(16e6)
k_f = 1
q_theta = np.float32(0)
q_f = np.float32(256)
r = np.float32(390625)
p0 = np.float32(1e6)
t, z, theta, f = freq_est_matrix(
    data, nominal_freq=nominal_freq, k_f=k_f, q_theta=q_theta, q_f=q_f, r=r, p0=p0
)

fig, (ax_freq, ax_theta) = plt.subplots(2, 1, figsize=(6, 8), sharex=True)
ax_freq.plot(t, f)
ax_freq.set_title("Frequency")
ax_freq.set_ylabel("f")

ax_theta.plot(t, theta - theta[0], t, z - z[0])
ax_theta.set_title("Phase")
ax_theta.set_ylabel("θ")
ax_theta.set_xlabel("t")
ax_theta.legend(("Filtered", "Measured"));