In [15]:
from enum import IntEnum, Enum, auto

class XState(IntEnum): # Session Termination
    BI_DIRECTIONAL = 0
    LMND_LOCAL = 1
    SENDING_RNMD = 2
    GOT_RNMD_HAVE_DATA = 3
    GOT_RNMD_WAIT_LOCAL_EMPTY = 4
    BOTH_EMPTY = 5

class YState(IntEnum): # Comm Change
    IDLE = 0
    LOCAL_COMM_CHANGE = 1
    SENDING_COMM_CHANGE = 2
    WAIT_ACK = 3
    RCCD_RECEIVED = 4 # full duplex only
    ACT_ON_RCCD = 5 # full duplex only

class ZState(IntEnum): # Symbol Inlock Status
    INLOCK_TRUE = 0
    INLOCK_FALSE = 1

class ModulationState(Enum):
    OFF = 0
    ON = 1

class BoolVar(Enum):
    FALSE = 0
    TRUE = 1

class Prox1ControlState: # prototype state struct
    def __init__(self):
        # Session Termination / Comm_Change
        self.X = XState.BI_DIRECTIONAL
        self.Y = YState.IDLE
        self.Z = ZState.INLOCK_TRUE

        # Physical interface
        self.MODULATION = ModulationState.OFF

        # PLCW / status report
        self.NEED_PLCW = BoolVar.TRUE
        self.NEED_STATUS_REPORT = BoolVar.TRUE

        # Buffers
        self.REMOTE_SCID_BUFFER = 0
        self.COMMUNICATION_VALUE_BUFFER = None
        self.RECEIVING_SCID_BUFFER = None
        self.REMOTE_PCID_BUFFER = None

    # Session termination around X
    # Track the sub-states of the full and half-duplex session termination process
    # Tracks where you are in closing a data session between two transceivers

    def local_no_more_data(self, duplex="full"):
        if duplex == "half":
            # in half duplex, X = 1 means local LNMD
            if self.X == XState.BI_DIRECTIONAL:
                self.X = XState.LMND_LOCAL
        else:
            # in full duplex, we may directly proceed to sending RNMD
            if self.X in (XState.BI_DIRECTIONAL, XState.GOT_RNMD_WAIT_LOCAL_EMPTY):
                self.X = XState.SENDING_RNMD

    # Local sends RNMD across the link
    def send_rnmd(self):
        if self.X in (XState.LMND_LOCAL, XState.BI_DIRECTIONAL):
            self.X = XState.SENDING_RNMD

    # Receive RNMD from remote
    def receive_rnmd(self, local_has_data: bool):
        if local_has_data:
            # half duplex: remote has no data, local still has some
            self.X = XState.GOT_RNMD_HAVE_DATA
        else:
            # both are empty now
            self.X = XState.BOTH_EMPTY

    # When both empty, terminate and restart
    def terminate_session_if_done(self):
        if self.X == XState.BOTH_EMPTY:
            # inform "vehicle controller" here in a real system
            self.X = XState.BI_DIRECTIONAL

    # Comm change around Y
    # Track the sub-states during the commanding of a physical layer communications change
    # Tracks where you are in changing the physical-layer communication settings

    def request_comm_change(self, new_comm_value):
        # step 1: higher layer asks for comm_change
        self.Y = YState.LOCAL_COMM_CHANGE
        self.COMMUNICATION_VALUE_BUFFER = new_comm_value

    def start_sending_comm_change(self):
        self.Y = YState.SENDING_COMM_CHANGE

    def comm_change_sent_wait_ack(self):
        if self.Y == YState.SENDING_COMM_CHANGE:
            self.Y = YState.WAIT_ACK

    def receive_comm_change_ack(self):
        if self.Y == YState.WAIT_ACK:
            self.Y == YState.IDLE

    # Z is updated by lower layer's lock status
    # Tracks the nondeterministic events
    # (losing lock can happen at unpredictable moments, so Z remembers if it has already happened)
    # tracks whether the receiver has lost symbol lock during a comm_change state

    def symbol_inlock_changed(self, in_lock: bool):
        self.Z = ZState.INLOCK_TRUE if in_lock else ZState.INLOCK_FALSE

    # for modulation and PLCW / status report

    def set_modulation(self, on: bool):
        self.modulation = ModulationState.ON if on else ModulationState.OFF

    def mark_need_plcw(self):
        self.NEED_PLCW = BoolVar.TRUE

    def mark_need_status_report(self):
        self.NEED_STATUS_REPORT = BoolVar.TRUE

    def select_plcw_for_output(self):
        # called by "data selection for output"
        if self.NEED_PLCW == BoolVar.TRUE:
            self.NEED_PLCW = BoolVar.FALSE
            return True
        return False

    def select_status_report_for_output(self):
        if self.NEED_STATUS_REPORT == BoolVar.TRUE:
            self.NEED_STATUS_REPORT = BoolVar.FALSE
            return True
        return False

    # for SCID/PCID buffers, keep simple seters and compare helpers

    def load_remote_scid(self, scid: int):
        assert 0 <= scid <= 1023
        self.REMOTE_SCID_BUFFER = scid

    def load_receiving_scid(self, scid: int):
        assert 0 <= scid <= 1023
        self.RECEIVING_SCID_BUFFER = scid

    def load_receiving_pcid(self, pcid: int):
        self.RECEIVING_PCID_BUFFER = pcid

    def validate_frame_ids(self, frame_scid: int, frame_pcid: int) -> bool:
        scid_ok = (
            self.RECEIVING_SCID_BUFFER is None
            or frame_scid == self.RECEIVING_SCID_BUFFER
        )
        pcid_ok = (
            self.RECEIVING_PCID_BUFFER is None
            or frame_pcid == self.RECEIVING_PCID_BUFFER
        )
        return scid_ok and pcid_ok

In [16]:
def demo_full_duplex_termination():
    state = Prox1ControlState()

    # both sides with bi-directional transfer
    print("Initial X:", state.X)

    # local runs out of data
    state.local_no_more_data(duplex="full")
    print("After LMND local:", state.X)

    # remote replies with RNMD; assume no local data
    state.receive_rnmd(local_has_data=False)
    print("After receive RNMD:", state.X)

    # finalize termination
    state.terminate_session_if_done()
    print("After termination:", state.X)

if __name__ == "__main__":
    demo_full_duplex_termination()

Initial X: 0
After LMND local: 2
After receive RNMD: 5
After termination: 0
