In [None]:
"""
app.py — Flask GUI for PyVISA device discovery + SCIPY Excel command library

Features:
- Discover button: rescans VISA resources AND reloads Excel from disk each time
- Looks up device model against Excel sheet names
- Renders device table + per-command GET/SET controls (frontend in your index.html/app.js)
- Command history stored server-side + endpoints for UI to show/copy/clear
- Serial comm settings:
    - Discovery uses your known-good ASRL settings (115200, 8N1, no flow, \n terminations)
    - Those settings are stored in "comm" per device and reused for send/query
"""

from __future__ import annotations

import re
from dataclasses import dataclass
from datetime import datetime
from typing import Any

import pandas as pd
import pyvisa
from flask import Flask, jsonify, render_template, request

###############################################################################
# Configuration
###############################################################################

EXCEL_PATH = "SCIPY_DEF.xlsx"  # put next to app.py (or change path here)
HISTORY_MAX = 500

###############################################################################
# Flask
###############################################################################

app = Flask(__name__)
app.config["JSON_SORT_KEYS"] = False

###############################################################################
# Utilities
###############################################################################


def _clean_str(x: Any) -> str:
    if x is None:
        return ""
    if isinstance(x, float) and pd.isna(x):
        return ""
    s = str(x).strip()
    return "" if s.lower() == "nan" else s


def parse_idn(idn: str) -> dict[str, str]:
    """
    Your parsing rules:
    - If ',' present → split on ','
    - Else → split on whitespace
    Returns dict: vendor, model, id
    """
    if not idn:
        return {"vendor": "UNKNOWN", "model": "UNKNOWN", "id": ""}

    s = idn.strip()
    parts = [p.strip() for p in (s.split(",") if "," in s else s.split()) if p.strip()]

    return {
        "vendor": parts[0] if len(parts) > 0 else "UNKNOWN",
        "model": parts[1] if len(parts) > 1 else "UNKNOWN",
        "id": (",".join(parts[2:]) if "," in s else " ".join(parts[2:])) if len(parts) > 2 else "",
    }


###############################################################################
# 1) Your known-good discovery function (with comm profile added)
###############################################################################


def find_visa_instruments(
    timeout_ms: int = 500,
    serial_timeout_ms: int = 500,
) -> list[dict[str, Any]]:
    """
    Returns list of dicts:
    {
      "resource": ...,
      "vendor": ...,
      "model": ...,
      "id": ...,
      "comm": {...}   # serial settings for ASRL, {} otherwise
    }
    """
    rm = pyvisa.ResourceManager()
    found: list[dict[str, Any]] = []

    for r in rm.list_resources():
        try:
            comm: dict[str, Any] = {}

            with rm.open_resource(r) as inst:
                inst.timeout = timeout_ms

                if r.startswith("ASRL"):
                    inst.timeout = serial_timeout_ms
                    inst.baud_rate = 115200
                    inst.data_bits = 8
                    inst.parity = pyvisa.constants.Parity.none
                    inst.stop_bits = pyvisa.constants.StopBits.one
                    inst.flow_control = pyvisa.constants.VI_ASRL_FLOW_NONE
                    inst.read_termination = "\r\n"
                    inst.write_termination = "\r\n"

                    try:
                        inst.clear()
                    except Exception:
                        pass

                    # store comm profile (JSON-friendly)
                    comm = {
                        "timeout": serial_timeout_ms,
                        "baud_rate": 115200,
                        "data_bits": 8,
                        "parity": int(pyvisa.constants.Parity.none),
                        "stop_bits": int(pyvisa.constants.StopBits.one),
                        # flow_control is already an int VISA constant in practice, but keep safe:
                        "flow_control": int(pyvisa.constants.VI_ASRL_FLOW_NONE),
                        "read_termination": "\n",
                        "write_termination": "\n",
                    }

                idn_info = parse_idn(inst.query("*IDN?"))

            entry: dict[str, Any] = {
                "resource": r,
                **idn_info,
                "comm": comm,
            }

            print(entry)
            found.append(entry)

        except pyvisa.errors.VisaIOError:
            # Ignore resources that can't be opened/respond quickly
            continue
        except Exception as e:
            print(f"[--] {r} -> {type(e).__name__}: {e}")

    return found


###############################################################################
# 2) Excel SCIPY library
###############################################################################


@dataclass
class ScipyCommand:
    name: str
    get_set: str
    cmd_template: str
    params_raw: str = ""
    param_name: str | None = None
    options: list[str] | None = None
    fmt: str | None = None

    def to_dict(self) -> dict[str, Any]:
        return {
            "name": self.name,
            "get_set": self.get_set,
            "cmd_template": self.cmd_template,
            "params_raw": self.params_raw,
            "param_name": self.param_name,
            "options": self.options,
            "fmt": self.fmt,
        }


def _derive_name_from_cmd(cmd: str) -> str:
    base = re.sub(r"\{.*?\}", "", cmd).strip()
    base = base.replace("?", "").strip()
    if ":" in base:
        return base.split(":")[-1] or base
    return base or "CMD"


def parse_parameters(params_raw: str) -> tuple[str | None, list[str] | None, str | None]:
    """
    Supported Excel 'Parameters' patterns:
      - Dropdown:   "RANGE:AUTO;500E-3;5;50;500;1000"
      - Dropdown:   "STATE:0;1"
      - Format hint "VOLT:V.2f" (optional)
    Returns: (param_name, options, fmt)
    """
    s = _clean_str(params_raw)
    if not s:
        return None, None, None

    # allow future multi-params separated by '|', but UI currently uses first block
    blocks = [b.strip() for b in s.split("|") if b.strip()]
    b0 = blocks[0]

    if ":" not in b0:
        return None, None, None

    name, rest = b0.split(":", 1)
    name = name.strip()
    rest = rest.strip()

    if ";" in rest:
        opts = [r.strip() for r in rest.split(";") if r.strip()]
        return name, opts, None

    # treat as format hint
    return name, None, rest


class ExcelScipyLibrary:
    def __init__(self, excel_path: str):
        self.excel_path = excel_path
        self.sheets: list[str] = []
        self.by_sheet: dict[str, list[ScipyCommand]] = {}
        self._load()

    def _load(self) -> None:
        xl = pd.ExcelFile(self.excel_path)
        self.sheets = list(xl.sheet_names)
        self.by_sheet.clear()

        for sheet in self.sheets:
            df = pd.read_excel(self.excel_path, sheet_name=sheet)
            df.rename(columns={c: str(c).strip() for c in df.columns}, inplace=True)

            if "GET/SET" not in df.columns or "CMD" not in df.columns:
                continue

            cmds: list[ScipyCommand] = []
            for _, row in df.iterrows():
                get_set = _clean_str(row.get("GET/SET", "")).upper()
                cmd_template = _clean_str(row.get("CMD", ""))
                if not cmd_template or get_set not in {"GET", "SET"}:
                    continue

                name = _clean_str(row.get("Name", ""))
                if not name:
                    name = _derive_name_from_cmd(cmd_template)

                params_raw = _clean_str(row.get("Parameters", ""))
                param_name, options, fmt = parse_parameters(params_raw)

                cmds.append(
                    ScipyCommand(
                        name=name,
                        get_set=get_set,
                        cmd_template=cmd_template,
                        params_raw=params_raw,
                        param_name=param_name,
                        options=options,
                        fmt=fmt,
                    )
                )

            self.by_sheet[sheet] = cmds

    def match_sheet(self, vendor: str, model: str) -> str | None:
        """
        Matching strategy (simple & robust):
          1) exact match sheet == model (case-insensitive)
          2) model is substring of sheet name
          3) vendor+model both substring
        """
        v = (vendor or "").strip().lower()
        m = (model or "").strip().lower()
        if not m:
            return None

        for s in self.sheets:
            if s.strip().lower() == m:
                return s

        for s in self.sheets:
            if m in s.strip().lower():
                return s

        for s in self.sheets:
            sl = s.strip().lower()
            if v and v in sl and m in sl:
                return s

        return None

    def commands_for_sheet(self, sheet: str) -> list[ScipyCommand]:
        return self.by_sheet.get(sheet, [])


###############################################################################
# 3) Globals: library, last devices, history
###############################################################################

lib = ExcelScipyLibrary(EXCEL_PATH)

_LAST_DEVICES: list[dict[str, Any]] = []
_HISTORY: list[dict[str, Any]] = []


def reload_excel_library() -> None:
    global lib
    lib = ExcelScipyLibrary(EXCEL_PATH)


def history_add(entry: dict[str, Any]) -> None:
    _HISTORY.append(entry)
    if len(_HISTORY) > HISTORY_MAX:
        del _HISTORY[: len(_HISTORY) - HISTORY_MAX]


###############################################################################
# 4) Command formatting + VISA I/O
###############################################################################


def _format_set_command(cmd_template: str, param_name: str | None, value: str | None, fmt: str | None) -> str:
    """
    Replace {PARAM} with given value.
    If fmt contains something like ".2f" (optionally prefixed), format numeric.
    """
    if not param_name:
        return cmd_template

    v = (value or "").strip()

    if fmt:
        m = re.search(r"(\.\d+[a-zA-Z])", fmt)  # .2f, .3e, etc.
        if m:
            spec = m.group(1)
            try:
                v = format(float(v), spec)
            except Exception:
                pass

    return cmd_template.replace("{" + param_name + "}", v)


def _apply_comm(inst, comm: dict[str, Any]) -> None:
    """
    Apply previously stored comm settings (ASRL).
    IMPORTANT:
      - Never reduce an existing timeout (prevents 100 ms discovery timeout from breaking runtime).
    """
    if not comm:
        return

    # Timeout: never reduce the current timeout
    if "timeout" in comm:
        try:
            inst.timeout = max(int(inst.timeout), int(comm["timeout"]))
        except Exception:
            pass

    # Serial settings (only if present)
    if "baud_rate" in comm:
        inst.baud_rate = int(comm["baud_rate"])
    if "data_bits" in comm:
        inst.data_bits = int(comm["data_bits"])

    # parity / stop bits stored as ints for JSON
    if "parity" in comm:
        inst.parity = pyvisa.constants.Parity(int(comm["parity"]))
    if "stop_bits" in comm:
        inst.stop_bits = pyvisa.constants.StopBits(int(comm["stop_bits"]))

    # flow_control stored as int for JSON
    if "flow_control" in comm:
        inst.flow_control = int(comm["flow_control"])

    if "read_termination" in comm:
        inst.read_termination = comm["read_termination"]
    if "write_termination" in comm:
        inst.write_termination = comm["write_termination"]

    # Clear buffers (best effort)
    try:
        inst.clear()
    except Exception:
        pass


def _configure_serial_defaults(inst, timeout_ms: int = 3000) -> None:
    """
    Your known-good serial settings.
    """
    inst.timeout = timeout_ms
    inst.baud_rate = 115200
    inst.data_bits = 8
    inst.parity = pyvisa.constants.Parity.none
    inst.stop_bits = pyvisa.constants.StopBits.one
    inst.flow_control = pyvisa.constants.VI_ASRL_FLOW_NONE

    # Make read/write terminations consistent
    inst.read_termination = "\r\n"
    inst.write_termination = "\r\n"

    try:
        inst.clear()
    except Exception:
        pass



def visa_send(resource: str, cmd: str, is_get: bool, comm: dict[str, Any] | None = None) -> str:
    """
    Send/query VISA command.

    Policy:
    - If ASRL: always configure serial defaults (solid), then apply optional comm overrides.
    - Else: set timeout only, then apply optional comm overrides.
    """
    rm = pyvisa.ResourceManager()

    # Using context manager ensures close even on exceptions
    with rm.open_resource(resource) as inst:
        # Default for non-serial
        inst.timeout = 3000

        if resource.startswith("ASRL"):
            # Use the same "solid" recipe you trust
            _configure_serial_defaults(inst, timeout_ms=3000)

        # If you still want the discovery comm dict to override anything, apply it last
        if comm:
            # Apply only keys that are present (and keep timeout >= current)
            if "timeout" in comm:
                try:
                    inst.timeout = max(int(inst.timeout), int(comm["timeout"]))
                except Exception:
                    pass

            if "baud_rate" in comm:
                inst.baud_rate = int(comm["baud_rate"])
            if "data_bits" in comm:
                inst.data_bits = int(comm["data_bits"])
            if "parity" in comm:
                inst.parity = pyvisa.constants.Parity(int(comm["parity"]))
            if "stop_bits" in comm:
                inst.stop_bits = pyvisa.constants.StopBits(int(comm["stop_bits"]))
            if "flow_control" in comm:
                inst.flow_control = int(comm["flow_control"])

            if "read_termination" in comm:
                inst.read_termination = comm["read_termination"]
            if "write_termination" in comm:
                inst.write_termination = comm["write_termination"]

            try:
                inst.clear()
            except Exception:
                pass

        # Now execute
        if is_get:
            return str(inst.query(cmd)).strip()

        inst.write(cmd)
        return "OK"







###############################################################################
# 5) Discovery enrichment
###############################################################################


def discover_devices() -> list[dict[str, Any]]:
    devices = find_visa_instruments()
    enriched: list[dict[str, Any]] = []

    for d in devices:
        vendor = d.get("vendor", "")
        model = d.get("model", "")

        sheet = lib.match_sheet(vendor, model)

        enriched.append(
            {
                **d,
                "sheet": sheet,
                "found_in_excel": bool(sheet),
                "commands": [c.to_dict() for c in (lib.commands_for_sheet(sheet) if sheet else [])],
            }
        )

    return enriched


###############################################################################
# 6) Routes
###############################################################################


@app.get("/")
def index():
    return render_template("index.html", devices=_LAST_DEVICES)


@app.post("/api/discover")
def api_discover():
    global _LAST_DEVICES
    # reload Excel every discover (your requested tweak)
    reload_excel_library()
    _LAST_DEVICES = discover_devices()
    return jsonify({"ok": True, "devices": _LAST_DEVICES, "sheets": lib.sheets})


@app.post("/api/send")
def api_send():
    """
    POST JSON from frontend:
    {
      "resource": "...",
      "comm": {...},
      "get_set": "GET"|"SET",
      "cmd_template": "...",
      "param_name": "RANGE"|null,
      "fmt": "V.2f"|null,
      "value": "AUTO"|"..."
    }
    """
    data = request.get_json(force=True) or {}

    resource = (data.get("resource") or "").strip()
    comm = data.get("comm") or {}
    get_set = (data.get("get_set") or "").strip().upper()

    cmd_template = (data.get("cmd_template") or "").strip()
    param_name = data.get("param_name", None)
    fmt = data.get("fmt", None)
    value = data.get("value", None)

    if not resource or not cmd_template or get_set not in {"GET", "SET"}:
        return jsonify({"ok": False, "error": "Invalid request"}), 400

    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    try:
        if get_set == "GET":
            sent = cmd_template
            resp = visa_send(resource, sent, is_get=True, comm=comm)

            history_add({
                "ts": ts,
                "resource": resource,
                "get_set": "GET",
                "sent": sent,
                "response": resp,
            })

            return jsonify({"ok": True, "sent": sent, "response": resp})

        # SET
        sent = _format_set_command(cmd_template, param_name, value, fmt)
        resp = visa_send(resource, sent, is_get=False, comm=comm)

        history_add({
            "ts": ts,
            "resource": resource,
            "get_set": "SET",
            "sent": sent,
            "response": resp,
        })

        return jsonify({"ok": True, "sent": sent, "response": resp})

    except Exception as e:
        # Log failures too (very useful for debugging)
        history_add({
            "ts": ts,
            "resource": resource,
            "get_set": get_set,
            "sent": cmd_template if get_set == "GET" else _format_set_command(cmd_template, param_name, value, fmt),
            "response": f"ERR: {type(e).__name__}: {e}",
        })
        return jsonify({"ok": False, "error": str(e)}), 500



@app.get("/api/history")
def api_history():
    return jsonify({"ok": True, "history": _HISTORY})


@app.post("/api/history/clear")
def api_history_clear():
    _HISTORY.clear()
    return jsonify({"ok": True})


###############################################################################
# 7) Main
###############################################################################

if __name__ == "__main__":
    # If you run this inside Jupyter, use:
    app.run(debug=True, use_reloader=False)
    #app.run(host="127.0.0.1", port=5000, debug=True)


 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
127.0.0.1 - - [06/Jan/2026 21:27:58] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:27:58] "GET /static/app.js HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:27:58] "GET /api/history HTTP/1.1" 200 -
  return self.read()
127.0.0.1 - - [06/Jan/2026 21:28:10] "POST /api/discover HTTP/1.1" 200 -


{'resource': 'ASRL13::INSTR', 'vendor': 'KORAD', 'model': 'KA3005PS', 'id': 'V1.0 SN:03820940', 'comm': {'timeout': 500, 'baud_rate': 115200, 'data_bits': 8, 'parity': 0, 'stop_bits': 10, 'flow_control': 0, 'read_termination': '\n', 'write_termination': '\n'}}


  return self.read()


{'resource': 'ASRL13::INSTR', 'vendor': 'KORAD', 'model': 'KA3005PS', 'id': 'V1.0 SN:03820940', 'comm': {'timeout': 500, 'baud_rate': 115200, 'data_bits': 8, 'parity': 0, 'stop_bits': 10, 'flow_control': 0, 'read_termination': '\n', 'write_termination': '\n'}}


127.0.0.1 - - [06/Jan/2026 21:28:36] "POST /api/discover HTTP/1.1" 200 -


{'resource': 'ASRL16::INSTR', 'vendor': 'OWON', 'model': 'XDM2041', 'id': '2153136,V3.0.0,3', 'comm': {'timeout': 500, 'baud_rate': 115200, 'data_bits': 8, 'parity': 0, 'stop_bits': 10, 'flow_control': 0, 'read_termination': '\n', 'write_termination': '\n'}}


127.0.0.1 - - [06/Jan/2026 21:28:40] "POST /api/send HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:28:40] "GET /api/history HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:28:43] "POST /api/send HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:28:43] "GET /api/history HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:28:46] "POST /api/send HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:28:46] "GET /api/history HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:28:50] "POST /api/send HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:28:50] "GET /api/history HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:29:27] "POST /api/send HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:29:27] "GET /api/history HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:29:49] "POST /api/send HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:29:49] "GET /api/history HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:29:51] "POST /api/send HTTP/1.1" 200 -
127.0.0.1 - - [06/Jan/2026 21:29:51] "GET /api/history HTTP/1.1" 200 -
127.0.0.1 - - [06/Ja