In [1]:
#jupyter notebooks act as bash/powershell/cmdline-esque terminals, ie you can simply

In [2]:
pip install pyserial

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
#check for all COM ports connected, select the controller board COM port
import serial.tools.list_ports
print([p.device for p in serial.tools.list_ports.comports()])

['COM3', 'COM4']


In [4]:
#anything that you do not already have, install in cell 1
import re, time
import serial
import pandas as pd
import math
import numpy as np
import matplotlib.pyplot as plt

PORT = "COM8" #SET TO THE SUPERCAPACITOR COM PORT NUMBER
BAUD = 115200

# If PuTTY works but Python doesn't, try "\r" instead of "\r\n"
EOL = "\r\n"     # "\r", "\n", or "\r\n"

SETTLE_S = 0.5
MIN_ABS_mA = 100   # discard near-zero/clipped readings 

DIR_VAL = 0        # INN = dir 0 , INP = dir 1
DAC_COUNTS = [2600, 2500, 2400, 2300, 2200, 2100, 2000, 1900, 1800]  
#(safe to set from 1800 to 2600, although discard value pairs if clipped (near +-100mA)

In [5]:
PROMPT_RE = re.compile(r"(?m)^\s*scv2>\s*$")
OK_RE     = re.compile(r"(?m)^\s*ok\s*$")
ERR_RE    = re.compile(r"(?i)\b(err(or)?|unknown|invalid|fail(ed)?)\b")

class ScvCli:
    def __init__(self, port, baud, eol="\r\n"):
        self.port = port
        self.baud = baud
        self.eol = eol
        self.ser = None

    def open(self):
        self.ser = serial.Serial(
            self.port, self.baud,
            timeout=0.05,
            write_timeout=0.5
        )
        time.sleep(0.2)
        self.ser.reset_input_buffer()
        self.ser.reset_output_buffer()
        # poke to get a prompt
        self._write(self.eol)
        self.read_until_prompt(timeout=1.5)

    def close(self):
        if self.ser and self.ser.is_open:
            self.ser.close()

    def _write(self, s):
        self.ser.write(s.encode("ascii", errors="replace"))
        self.ser.flush()

    def drain(self, quiet_s=0.10, max_total_s=1.0):
        buf = bytearray()
        t0 = time.time()
        last_rx = time.time()
        while time.time() - t0 < max_total_s:
            n = self.ser.in_waiting
            if n:
                buf += self.ser.read(n)
                last_rx = time.time()
            else:
                if time.time() - last_rx >= quiet_s:
                    break
                time.sleep(0.01)
        return buf.decode("utf-8", errors="replace")

    def read_until_prompt(self, timeout=2.0):
        buf = bytearray()
        t0 = time.time()
        while time.time() - t0 < timeout:
            n = self.ser.in_waiting
            if n:
                buf += self.ser.read(n)
                txt = buf.decode("utf-8", errors="replace")
                if PROMPT_RE.search(txt):
                    return txt
            else:
                time.sleep(0.01)
        return buf.decode("utf-8", errors="replace")

    def cmd(self, command, timeout=2.0, require_ok=False):
        self.drain(quiet_s=0.05, max_total_s=0.5)
        self._write(command.strip() + self.eol)
        out = self.read_until_prompt(timeout=timeout)

        if ERR_RE.search(out):
            raise RuntimeError(f"Device error after '{command}':\n{out}")
        if not PROMPT_RE.search(out):
            raise TimeoutError(f"No prompt after '{command}' (timeout={timeout}s). Output:\n{out}")
        if require_ok and not OK_RE.search(out):
            raise RuntimeError(f"Expected 'ok' after '{command}', got:\n{out}")
        return out


In [6]:
RE_SWEN  = re.compile(r"(?m)^\s*Switch enable output \(SWEN\) pin:\s*(\d+)\s*$")
RE_DIR   = re.compile(r"(?m)^\s*Direction pin \(DIR\):\s*(\d+)\s*$")
RE_SAFE  = re.compile(r"(?m)^\s*Safety flag \(is_safe\):\s*(\d+)\s*$")
RE_DAC11 = re.compile(r"(?m)^\s*DAC1 channel 1 raw:\s*(\d+)\s*/\s*4095\s*$")
RE_DAC12 = re.compile(r"(?m)^\s*DAC1 channel 2 raw:\s*(\d+)\s*/\s*4095\s*$")
RE_WM_I  = re.compile(r"(?m)^\s*Wattmeter current \(average\):\s*(-?\d+)\s*mA\s*$")
RE_WM_V  = re.compile(r"(?m)^\s*Wattmeter voltage \(average\):\s*(\d+)\s*mV\s*$")
CAL_KV_RE = re.compile(r"(?m)^\s*([A-Z0-9_]+)\s*=\s*([-+]?\d+(?:\.\d+)?(?:[eE][-+]?\d+)?)\s*$")

def parse_status(status_txt: str) -> dict:
    def grab_int(regex, name):
        m = regex.search(status_txt)
        if not m:
            raise KeyError(f"Missing '{name}' in status output.")
        return int(m.group(1))

    d = {}
    d["swen"] = grab_int(RE_SWEN, "SWEN")
    d["dir"]  = grab_int(RE_DIR, "DIR")
    d["safe"] = grab_int(RE_SAFE, "is_safe")
    d["dac1_ch1_raw"] = grab_int(RE_DAC11, "DAC1 channel 1 raw")
    d["dac1_ch2_raw"] = grab_int(RE_DAC12, "DAC1 channel 2 raw")
    d["wm_i_mA"]      = grab_int(RE_WM_I, "Wattmeter current (average)")

    m = RE_WM_V.search(status_txt)
    d["wm_v_mV"] = int(m.group(1)) if m else None
    return d
def measure_dir(cli, dir_val: int, dac_counts, dac1_channel: int, settle_s=0.5, min_abs_mA=100):
    # Safe reconfig
    cli.cmd("swen 0", timeout=2.0, require_ok=True)
    cli.cmd(f"dir {dir_val}", timeout=2.0, require_ok=True)

    # Verify direction + safety before sweeping
    st_txt = cli.cmd("status", timeout=3.0)
    st = parse_status(st_txt)
    if st["dir"] != dir_val:
        raise RuntimeError(f"DIR verify failed: expected {dir_val}, got {st['dir']}")
    if st["safe"] != 1:
        raise RuntimeError(f"is_safe={st['safe']} (expected 1). Refusing to proceed.")

    rows = []
    for dac in dac_counts:
        cli.cmd("swen 0", timeout=2.0, require_ok=True)
        cli.cmd(f"dac set 1 {dac1_channel} {dac}", timeout=2.0, require_ok=True)
        cli.cmd("swen 1", timeout=2.0, require_ok=True)

        time.sleep(settle_s)

        st_txt = cli.cmd("status", timeout=3.0)
        st = parse_status(st_txt)

        # sanity checks
        if st["dir"] != dir_val:
            raise RuntimeError(f"DIR changed mid-sweep: expected {dir_val}, got {st['dir']}")
        if st["safe"] != 1:
            raise RuntimeError(f"is_safe dropped to {st['safe']} mid-sweep.")
        if st["swen"] != 1:
            raise RuntimeError(f"SWEN={st['swen']} after enabling (expected 1).")
            
        key = f"dac1_ch{dac1_channel}_raw"
        if st[key] != dac:
            raise RuntimeError(f"DAC mismatch: asked {dac}, got {st[key]}")

        if abs(st["wm_i_mA"]) <= min_abs_mA:
            print(f"SKIP dir={dir_val} dac={dac}: wm_i_mA={st['wm_i_mA']} (near zero/clipped)")
            continue

        rows.append({
            "dir": dir_val,
            "dac_count": st[key],          # uses ch1 for DIR=0, ch2 for DIR=1
            "wm_i_mA": st["wm_i_mA"],
            "wm_v_mV": st.get("wm_v_mV", None),
            "t_s": time.time(),
        })
        
        print(f"OK  dir={dir_val} dac={dac}  wm_i_mA={st['wm_i_mA']}")

    cli.cmd("swen 0", timeout=2.0, require_ok=True)

    df = pd.DataFrame(rows)
    if not df.empty:
        df = df.sort_values("wm_i_mA")  # nice for plotting vs current
    return df

def fit_inverse_and_plot(df, name=""):
    if df is None or df.empty or len(df) < 2:
        raise RuntimeError(f"Need at least 2 points for regression ({name}).")

    I = df["wm_i_mA"].to_numpy(dtype=float) / 1000.0   # A
    cnt = df["dac_count"].to_numpy(dtype=float)        # counts

    # inverse fit: count = A + B*I
    B, A = np.polyfit(I, cnt, 1)

    cnt_pred = A + B * I
    ss_res = np.sum((cnt - cnt_pred)**2)
    ss_tot = np.sum((cnt - np.mean(cnt))**2)
    r2 = 1 - ss_res/ss_tot if ss_tot > 0 else float("nan")

    # plot
    I_line = np.linspace(I.min() - 0.2, I.max() + 0.2, 200)
    cnt_line = A + B * I_line

    plt.figure(figsize=(7,4.5))
    plt.scatter(I, cnt, label="Data")
    plt.plot(I_line, cnt_line, label="Fit: count = A + B·I")
    plt.xlabel("Current I (A)")
    plt.ylabel("DAC count")
    plt.title(f"{name} inverse fit (R²={r2:.6f})")
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.show()

    print(f"{name}: count = {A:.6f} + ({B:.6f})*I(A)   (R²={r2:.6f})")
    return A, B, r2

def cal_load_dict(cli) -> dict:
    out = cli.cmd("cal load", timeout=3.0)
    if "err:" in out.lower():
        # e.g. "err: no valid saved cal"
        raise RuntimeError(f"cal load failed:\n{out}")
    d = {}
    for k, v in CAL_KV_RE.findall(out):
        d[k] = float(v)
    if not d:
        raise RuntimeError(f"Couldn't parse cal load output:\n{out}")
    return d, out

def cal_set(cli, name: str, value: float):
    out = cli.cmd(f"cal set {name} {value:.9f}", timeout=2.0, require_ok=True)
    return out

def cal_save(cli):
    out = cli.cmd("cal save", timeout=3.0, require_ok=True)
    return out

def assert_close(name, got, exp, abs_tol=1e-3, rel_tol=1e-6):
    # counts and counts/A usually print ~6 decimals; abs_tol=1e-3 is safe
    if not math.isfinite(got):
        raise RuntimeError(f"{name}: loaded value is not finite: {got}")
    if abs(got - exp) > max(abs_tol, rel_tol * abs(exp)):
        raise RuntimeError(f"{name} mismatch: expected {exp}, got {got}")

def write_save_validate_cal(cli, targets: dict, abs_tol=1e-3, rel_tol=1e-6):
    # Set all requested values
    for k, v in targets.items():
        print(f"-> cal set {k} {v}")
        print(cal_set(cli, k, float(v)))

    # Save to EEPROM
    print("-> cal save")
    print(cal_save(cli))

    # Load back and validate
    loaded, raw = cal_load_dict(cli)
    print(raw)  # shows the full 'loaded:' list

    for k, v in targets.items():
        if k not in loaded:
            raise RuntimeError(f"After save+load, missing key {k} in loaded cal.")
        assert_close(k, loaded[k], float(v), abs_tol=abs_tol, rel_tol=rel_tol)

    print("✅ Cal saved + validated for:", ", ".join(targets.keys()))
    return loaded


In [None]:
cli = ScvCli(PORT, BAUD, eol=EOL)
cli.open()

print(cli.cmd("telemetry off", timeout=2.0))
cli.drain(quiet_s=0.20, max_total_s=2.0)   # flush any lingering id= lines

print(cli.cmd("ctrl manual", timeout=2.0))
print(cli.cmd("swen 0", timeout=2.0, require_ok=True))
print(cli.cmd(f"dir {DIR_VAL}", timeout=2.0, require_ok=True))

st0_txt = cli.cmd("status", timeout=3.0)
st0 = parse_status(st0_txt)
st0


In [None]:
# --- Run both directions and store results ---
df_inn = measure_dir(cli, dir_val=0, dac_counts=DAC_COUNTS, dac1_channel=1, settle_s=SETTLE_S, min_abs_mA=MIN_ABS_mA)
A_INN, B_INN, R2_INN = fit_inverse_and_plot(df_inn, name="INN (DIR=0)")

df_inp = measure_dir(cli, dir_val=1, dac_counts=DAC_COUNTS, dac1_channel=2, settle_s=SETTLE_S, min_abs_mA=MIN_ABS_mA)
A_INP, B_INP, R2_INP = fit_inverse_and_plot(df_inp, name="INP (DIR=1)")

# Print as C constants
print(f"float A_INN = {A_INN:.6f}f;   // counts")
print(f"float B_INN = {B_INN:.6f}f;   // counts/A")
print(f"float A_INP = {A_INP:.6f}f;   // counts")
print(f"float B_INP = {B_INP:.6f}f;   // counts/A")

In [None]:
targets = {
    "A_INP": A_INP,
    "B_INP": B_INP,
    "A_INN": A_INN,
    "B_INN": B_INN,
}

# Optional: ensure telemetry is off / no spam
cli.cmd("telemetry off", timeout=2.0)
cli.drain(quiet_s=0.10, max_total_s=1.0)

loaded = write_save_validate_cal(cli, targets, abs_tol=1e-3, rel_tol=1e-6)

In [None]:
cli.close()
#calibration complete

In [7]:

pwd

'C:\\Users\\ngwei'