In [1]:
import json
import struct
from socket import socket, AF_INET, IPPROTO_TCP, SOCK_STREAM, TCP_NODELAY
from typing import Any, Optional


HOST = "localhost"
PORT = 9999


class ReaperClient:
    def __init__(self, host: str = HOST, port: int = PORT):
        self.host = host
        self.port = port
        self._conn = None
        self._request_id = 0
        self._holding = False

    def connect(self) -> None:
        if self._conn:
            return
        self._conn = socket(AF_INET, SOCK_STREAM)
        self._conn.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
        self._conn.connect((self.host, self.port))
        print(f"Connected to {self.host}:{self.port}")

    def close(self) -> None:
        if self._conn:
            self._conn.close()
            self._conn = None

    def _send(self, data: bytes) -> None:
        if not self._conn:
            raise RuntimeError("Not connected")
        self._conn.sendall(struct.pack("<I", len(data)) + data)

    def _recv_exact(self, n: int) -> bytes:
        if not self._conn:
            raise RuntimeError("Not connected")
        buf = bytearray()
        while len(buf) < n:
            chunk = self._conn.recv(n - len(buf))
            if not chunk:
                raise ConnectionError("socket closed")
            buf.extend(chunk)
        return bytes(buf)

    def _recv(self) -> dict:
        raw = self._recv_exact(4)
        length = struct.unpack("<I", raw)[0]
        return json.loads(self._recv_exact(length).decode("utf-8"))

    def _call(self, name: str, *args) -> Any:
        self._request_id += 1
        req = json.dumps(
            {"id": self._request_id, "type": "call", "name": name, "args": list(args)},
            ensure_ascii=False,
        ).encode("utf-8")

        self._send(req)
        resp = self._recv()

        if resp.get("type") == "error":
            raise RuntimeError(resp.get("traceback", "unknown error"))
        return resp.get("value")

    def _control(self, cmd: str) -> None:
        self._request_id += 1
        req = json.dumps(
            {"id": self._request_id, "type": "control", "cmd": cmd}, ensure_ascii=False
        ).encode("utf-8")
        self._send(req)
        resp = self._recv()

        if resp.get("type") == "error":
            raise RuntimeError(resp.get("traceback"))

    def hold(self):
        class HoldCtx:
            def __init__(self, client):
                self.client = client

            def __enter__(self):
                self.client._control("HOLD")
                self.client._holding = True
                return self

            def __exit__(self, *args):
                self.client._holding = False
                self.client._control("RELEASE")

        return HoldCtx(self)

    def __getattr__(self, name: str):
        def proxy(*args):
            return self._call(name, *args)

        proxy.__name__ = name
        return proxy


reaper = ReaperClient()

In [2]:
reaper.connect()

Connected to localhost:9999


## with hold

In [3]:
%%time
with reaper.hold():
    for num in range(100):
        num += 1
        reaper.ShowConsoleMsg(f"Hello REAPER uses! loop counts {num}\n")

CPU times: user 1.05 ms, sys: 2.1 ms, total: 3.16 ms
Wall time: 33.8 ms


## no hold

In [4]:
%%time
for num in range(100):
    num += 1
    reaper.ShowConsoleMsg(f"Hello REAPER uses! loop counts {num}\n")

CPU times: user 6.88 ms, sys: 3.97 ms, total: 10.8 ms
Wall time: 4.22 s
