diff --git a/blog/2025-09-18-decentralized-consistency.md b/blog/2025-09-18-decentralized-consistency.md index 3f5fd66aa..41e4f121b 100644 --- a/blog/2025-09-18-decentralized-consistency.md +++ b/blog/2025-09-18-decentralized-consistency.md @@ -22,24 +22,29 @@ We would also like to automate the disarming of the door using a camera. When th There are different ways to design and refactor the above system, for example, by removing the direct connection between the `Cockpit` and `Door` reactors. Our design choice is meant to highlight that door _disarming_ and _opening_ are two different and independent remote services triggered by two different commands issued by two different system actors. Therefore, each actor has an independent connection to the door to request its service. -The purpose of the system is to open the door in reaction to the command from the cockpit only in normal conditions, that is, when the ramp is present and the door is not armed. The door, upon receiving the command from the cockpit, should wait for clearance from the camera before opening. +The purpose of the system is to open the door in reaction to the command from the cockpit whether or not a ramp is present. If a ramp is present, it is imperative that the door be disarmed _before_ being opened. Hence, the door, upon receiving the `open` command from the cockpit, should wait for input from the camera before opening. -The order in which messages are processed is crucial in this application. When the _disarm_ and _open_ commands arrive together, the _disarm_ service needs to be invoked before opening the door, otherwise the escape slide will be deployed and that is not the desired behavior. -Lingua Franca guarantees determinism in the execution order of reactions with simultaneous inputs, and the order is given by the the order of declaration of the reactions inside the reactor. It is then sufficient to declare the `disarm` reaction _before_ the `open` one. The diagram confirms the execution order by labeling the `disarm` reaction with 1 and the `open` reaction with 2. +The order in which messages are processed is crucial in this application. When the _disarm_ and _open_ commands arrive with the same tag, the _disarm_ service needs to be invoked before opening the door, otherwise the escape slide will be erroneously deployed. +Lingua Franca guarantees determinism in the execution order of reactions with logically simultaneous inputs, and the order is given by the the order of declaration of the reactions inside the reactor. It is then sufficient to declare the `disarm` reaction _before_ the `open` one. The diagram confirms the execution order by labeling the `disarm` reaction with 1 and the `open` reaction with 2. -The more challenging situation is when the inputs do not arrive together. In fact, if the command from the cockpit arrives before the clearance from the camera and the door processes it immediately, the door will be opened while still _armed_, and the evacuation slide will be deployed as if it was an emergency landing. The door, then, has to wait for both inputs before invoking the _opening_ service. -This is then an example of an application that cannot safely proceed its processing without assurance on its inputs. The following section explains how to obtain the desired behavior in Lingua Franca. +The problem is that even though the messages are _logically_ simultaneous, they do not arrive at the same _physical_ time. In fact, if the `open` command from the cockpit is likely to arrive before the clearance from the camera because the camera realizes an expensive computer-vision algorithm. The door, consequently, has to wait for both inputs before invoking the _opening_ service. + +This is then an example of an application that cannot safely proceed without assurance on its inputs. The following section explains how to obtain the desired behavior in Lingua Franca using the decentralized coordinator (the centralized coordinator automatically provides the required assurance). ### Consistency with decentralized coordination -The application is implemented as a federated program with decentralized coordination, which means that the advancement of logical time in each single federate is not subject to approval from any centralized entities, but it is done locally based on the input it receives from the other federates. +The application is implemented as a federated program with decentralized coordination, which means that the advancement of logical time in each single federate is not subject to approval from any centralized entities, but it is done locally based on the input it receives from the other federates and on its local physical clock. Let us consider the case when the `Door` reactor receives the _open_ command from the `Cockpit` reactor, but not yet the _disarm_ command from the `Camera` reactor. As previously observed, the `Door` cannot proceed to invoke the _opening_ service, because it needs to wait for the `Camera` to send the _disarm_ command. But how long should it wait? -Lingua Franca allows you to customize this waiting time. Each federate is associated with a parameter called [`maxwait`](/docs/writing-reactors/distributed-execution#safe-to-advance-sta) that controls how long the federate should wait for inputs from other federates before processing an input it has just received. -More precisely, `maxwait` is how much time a federate waits before advancing its tag to that of the just received event, when it is not known if the other input ports will receive data with the same or an earlier tag. At the expiration of the `maxwait`, the federate assumes that those unresolved ports will not receive any data with earlier tags, and advances its logical time to the tag of the received event. +The decentralized coordinator in +Lingua Franca allows you to customize this waiting time. Each federate can be assigned an attribute called [`maxwait`](/docs/writing-reactors/distributed-execution#safe-to-advance-sta) that controls how long the federate should wait for inputs from other federates before processing an event, such as an input it has just received. +More precisely, `maxwait` is the maximum amount of time a federate waits before advancing its logical time to some value _t_. Specifically, to advance to logical time _t_, the federate waits until either all inputs are known up to an including time _t_ or its local physical clock exceeds _t_ +`maxwait`. +An input is known up to an including time _t_ if message with timestamp _t_ or greater has been received on that input port. +At the expiration of the `maxwait`, the federate assumes that any unresolved ports will not receive any messages with timestamps _t_ or earlier. +It can then advance its logical time to _t_. -In our example, we want the door to _indefinitely wait_ for both _disarm_ and _open_ commands to arrive before processing any of them. In Lingua Franca, this is obtained by setting `maxwait = forever`, and it means that the `Door` reactor cannot safely proceed without assurance about the inputs. +In our example, we want the door to _wait indefinitely_ for both _disarm_ and _open_ commands to arrive before processing any of them. In Lingua Franca, this is obtained by setting `maxwait` to `forever`. The `Door` reactor cannot safely proceed without assurance about the inputs. The implementation of the `Door` reactor and its instantiation are shown below: @@ -51,23 +56,31 @@ reactor Door { state isOpen: bool = false reaction(disarm) {= - if (!self->isDisarmed) { + if (disarm->value) { self->isDisarmed = true; printf("Door disarmed\n"); + } else { + self->isDisarmed = false; + printf("Door armed\n"); } - =} maxwait {= + =} iflate {= printf("STP violation\n"); + printf("Intended tag: %lld\n", disarm->intended_tag); + printf("Current tag: %lld\n", lf_time_logical()); =} reaction(open) {= - if (self->isDisarmed) { - printf("Door open - normal mode\n"); + if (open->value) { + self->open = true; + printf("Door open\n"); } else { - // This should never happen - printf("Door open - !emergency mode!\n"); + self->open = false; + printf("Door closed\n"); } - =} maxwait {= + =} iflate {= printf("STP violation\n"); + printf("Intended tag: %lld\n", disarm->intended_tag); + printf("Current tag: %lld\n", lf_time_logical()); =} } @@ -83,18 +96,25 @@ federated reactor { v.ramp_present -> d.disarm } ``` -The reaction triggered by the `open` command prints on the standard output whether the door was _disarmed_ or not at the time of opening. We do not expect emergency openings with `maxwait` set to `forever`. -The `maxwait` parameter is specified at instantiation time within the main reactor. Right before creating the instance of the `Door` reactor for which we want to set the parameter, we use the `@maxwait` annotation that takes as input the new `maxwait` value. The reactions of the `Door` reactor that are triggered by remote inputs are associated with a [fault handler](/docs/writing-reactors/distributed-execution#safe-to-process-stp-violation-handling) that is invoked in the case of timing inconsistencies during input processing. This event will be thoroughly discussed in another blog post. +The `maxwait` attribute is specified at instantiation time within the main reactor. Right before creating the instance of the `Door` reactor for which we want to set the attribute, we use the `@maxwait` annotation that takes as input the `maxwait` value. + +The reactions of the `Door` reactor provide [fault handlers](/docs/writing-reactors/distributed-execution#safe-to-process-stp-violation-handling) that are invoked in case the the federate assumed inputs were known up to timestamp _t_ and then later received a message with timestamp _t_ or less. When `maxwait` is `forever`, these fault handlers should never be invoked. + +For finite values of `maxwait`, it is always possible for messages to get sufficiently delayed that the fault handlers will be invoked. +When they are invoked, the current tag will be greater than the intended tag of the message. +This type of fault is called a **safe-to-process** (**STP**) violation because messages are being handled out of tag order. +The intended tag of the input can be accessed as shown in the code above. ## Multirate inputs: the automatic emergency braking use case ![AutomaticEmergencyBrakingSystem diagram](../static/img/blog/AutomaticEmergencyBrakingSystem.svg) -Consider the above Lingua Franca implementation of an automatic emergency braking system, one of the most critical ADAS systems which modern cars are equipped with. -The controller system modeled by the `AutomaticEmergencyBraking` reactor reads data coming from two sensors, a lidar and a radar, and uses both to detect if objects or pedestrians cross the trajectory the car, thus performing _sensor fusion_. +Consider the above Lingua Franca implementation of an automatic emergency braking system, one of the most critical ADAS systems that modern cars are equipped with. +The controller system modeled by the `AutomaticEmergencyBraking` reactor reads data coming from two sensors, a lidar and a radar, and uses both to detect objects or pedestrians that cross the trajectory the car. +This is a _sensor fusion_ problem, where a diversity of sensors is used to get better reliability. When one of the two sensors signals the presence of an object at a distance shorter than a configurable threshold, the controller triggers the brake to stop the car and avoid crashing into it. -The sensors are modeled with their own timer that triggers the generation of data. The clocks of all federates are automatically synchronized by the [clock synchronization algorithm](/docs/writing-reactors/distributed-execution#clock-synchronization) of the Lingua Franca runtime. +The sensors are modeled with their own timer that triggers the generation of data. The clocks of all federates are automatically synchronized by the [clock synchronization algorithm](/docs/writing-reactors/distributed-execution#clock-synchronization) of the Lingua Franca runtime (unless this is disabled). Typically, in a real use case of this kind, the clock of sensor devices cannot be controlled by Lingua Franca, but a way to work around this limitation is to resample the data collected by sensors with the timing given by a clock that the runtime can control. The sensor reactors of our application are then modeling this resampling of sensor data that fits well with the Lingua Franca semantics for time determinism. @@ -111,18 +131,22 @@ The application is once agin implemented as a federated program with decentraliz Consistency problems may arise when a federate receives data from two or more federates, as it is the case of the `AutomaticEmergencyBraking` reactor. The controller expects to receive input from both sensors at times 0ms, 100ms, 200ms, etc. Let's consider as an example the case where the remote connection between the controller and the radar has a slightly larger delay than that between the controller and the lidar. The lidar input will then always arrive slightly earlier than the radar one. When the controller receives the lidar input, should it process the data immediately, or should it wait for the radar input to come? Sensor fusion requires consistency: if the controller processes the input from the lidar and then the radar data comes, the control action elaborated upon the arrival of the lidar data does not take into account both sensors, even though it should. Hence, in our use case, the `AutomaticEmergencyBraking` reactor needs to wait for both inputs before processing new data. -In our application, we aim to process all incoming data with the same logical time to realize sensor fusion. Hence, we set `maxwait = forever` to _indefinitely wait_ for the radar input before processing the radar. +In our application, if we aim to process all incoming data with the same logical time to realize sensor fusion, then we can set `maxwait = forever` to _wait indefinitely_ for the radar input before processing the radar. +Note that this might not be a good choice in this example because if a fault causes one of the sensors to stop sending messages, the ADAS system will stop working. +Hence, in practice, we will probably want a smaller value for `maxwait`, and we will want to add fault detection and mitigation to the application. +Fault handling will be addressed in a later blog. Here we assume no such faults. #### Availability challenge -However, setting `maxwait` to `forever` creates problems when only the lidar input is expected (50ms, 150ms, 250ms, etc): the controller cannot process that input until an input from the radar comes, because `maxwait` will never expire. For example, if the single lidar input comes at time 50ms, it has to wait until time 100ms before being processed. If that input was signaling the presence of a close object, the detection would be delayed by 50ms, which may potentially mean crashing into the object. The automatic emergency braking system must be available, otherwise it might not brake in time to avoid collisions. +Even without faults, however, setting `maxwait` to `forever` creates problems when only the lidar input is expected (50ms, 150ms, 250ms, etc): the controller cannot process that input until an input from the radar comes, because `maxwait` will never expire. For example, if the single lidar input comes at time 50ms, it has to wait until time 100ms before being processed. If that input was signaling the presence of a close object, the detection would be delayed by 50ms, which may potentially mean crashing into the object. The automatic emergency braking system must be available, otherwise it might not brake in time to avoid collisions. The ideal `maxwait` value for maximum availability in the time instants with only the lidar input is 0, because if a single input is expected, no wait is necessary. Summing up, consistency for sensor fusion requires `maxwait = forever` when inputs from both sensors are expected, while availability calls for `maxwait = 0` when only the lidar input is coming. The two values are at odds, and any value in between would mean sacrificing both properties at the same time. ### Dynamic adjustment of `maxwait` The knowledge of the timing properties of the application under analysis enables the _a priori_ determination of the time instants when both inputs are expected and those when only the lidar has new data available. -Lingua Franca allows to dynamically change the `maxwait` in the reaction body using the `lf_set_maxwait` API, that takes as input parameter the new `maxwait` value to set. +Lingua Franca allows to dynamically change the `maxwait` in the reaction body using the `lf_set_fed_maxwait` API, that takes as input parameter the new `maxwait` value to set. This capability of the language permits the automatic emergency braking federate to: + * start with `maxwait` statically set to `forever`, because at time 0 (startup) both sensors produce data; * set `maxwait` to 0 after processing both inputs with the same logical time, because the next data will be sent by the lidar only; * set `maxwait` back to `forever` after processing the radar input alone, because the next data will be sent by both sensors. @@ -131,21 +155,23 @@ This dynamic solution guarantees both consistency and availability in all input The implementation and the instantiation of the `AutomaticEmergencyBraking` reactor are shown below: ```lf-c -reactor AutomaticEmergencyBraking(dist_thld: float = 20.0) { +reactor AutomaticEmergencyBraking { input lidar_in: float input radar_in: float output brake: int state n_invocs: int = 0 reaction (lidar_in, radar_in) -> brake {= - if (lf_is_present(lidar_in) && lidar_in->value < self->dist_thld) { - printf("Lidar has detected close object -> signaling braking\n"); - lf_set(brake, 1); - lf_request_stop(); - } else if (lf_is_present(radar_in) && radar_in->value < self->dist_thld) { - printf("Radar has detected close object -> signaling braking\n"); - lf_set(brake, 1); - lf_request_stop(); + if (lidar_in->is_present && radar_in->is_present) { + if (sensor_fusion(lidar_in->value, radar_in->value) { + printf("Sensors detect a close object -> signaling braking\n"); + lf_set(brake, 1); + } + } else if (lidar_in->is_present) { + if (lidar_analysis(lidar_in->value)) { + printf("Lidar has detected close object -> signaling braking\n"); + lf_set(brake, 1); + } } self->n_invocs++; @@ -156,7 +182,7 @@ reactor AutomaticEmergencyBraking(dist_thld: float = 20.0) { } =} deadline(100ms) {= printf("AEB deadline violated\n"); - =} maxwait {= + =} iflate {= printf("STP violation on AEB\n"); =} @@ -176,6 +202,9 @@ reactor AutomaticEmergencyBraking(dist_thld: float = 20.0) { } ``` -The `dist_thld` parameter is the distance threshold from detected objects below which the `AutomaticEmergencyBraking` reactor activates the brakes. -The reaction body reads the distance reported by both the lidar and the radar, and if any of these is less than the threshold, it sends a signal to the `BrakingSystem` reactor. -The `n_invocs` integer state variable counts the number of times the reaction of the `AutomaticEmergencyBraking` reactor is invoked. This variable is used to determine how many inputs the reaction will see at the next invocation and set the `maxwait` accordingly. Even invocation numbers mean that the next reaction invocation will happen with both sensor inputs present, so `maxwait` is set to `forever`; with odd invocation numbers, the next reaction invocation will see new data from the lidar only, and `maxwait` is then set to 0. \ No newline at end of file +The `sensor_fusion()` function combines the data and returns `true` if braking is needed. +The `lidar_analysis()` function uses only lidar data to make a (presumably more conservative) decision. +The `n_invocs` integer state variable counts the number of times the reaction of the `AutomaticEmergencyBraking` reactor is invoked. This variable is used to determine how many inputs the reaction will see at the next invocation and set the `maxwait` accordingly. Even invocation numbers mean that the next reaction invocation will happen with both sensor inputs present, so `maxwait` is set to `forever`; with odd invocation numbers, the next reaction invocation will see new data from the lidar only, and `maxwait` is then set to 0. + +Clear, detecting and handling faults would be needed in practical implementation. +This will be the topic of a subsequent blog.