diff --git a/blog/2025-10-14-decentralized-consistency.md b/blog/2025-10-14-decentralized-consistency.md new file mode 100644 index 000000000..d05a78ef7 --- /dev/null +++ b/blog/2025-10-14-decentralized-consistency.md @@ -0,0 +1,210 @@ +--- +slug: decentralized-consistency +title: "Decentralized Consistency" +authors: [fra-p, eal, rcakella] +tags: [lingua franca, federation, decentralized, consistency, maxwait] +--- + +The design of [distributed applications](/docs/writing-reactors/distributed-execution) in Lingua Franca requires care, particularly if the coordination of the federation is [decentralized](/docs/writing-reactors/distributed-execution#decentralized-coordination). The intent of this post is to illustrate and handle the challenges arising from designing distributed applications in Lingua Franca, with the help of two realistic use cases. + +## Indefinite wait for inputs: aircraft door use case +Aircraft doors on passenger flights are currently managed manually by flight attendants. +Before takeoff, the flight attendants _arm_ the door; if the door is opened in this state, an evacuation slide is automatically inflated and deployed for emergency evacuation. +When the aircraft is at a gate, before opening the door, the flight attendants _disarm_ it to avoid the deployment of the evacuation slide. +Flight attendants are allowed to disarm the door _only_ when they see through the porthole the ramp that will allow the passengers to disembark the aircraft. + +![AircraftDoor diagram](../static/img/blog/AircraftDoor.svg) + +Consider the above Lingua Franca program that implements a simplified system to remotely open an aircraft door that is in the _armed_ state. +The door implements two independent remote services, door _disarming_ and door _opening_, encoded by two different reactions in the `Door` reactor. +Suppose the pilot in the cockpit issues a command to open the door. +We would also like to automate the disarming of the door using a camera to verify the presence of a ramp. When the camera determines that the ramp is present, it triggers the disarming service. The camera detection is triggered by the door open command issued from the cockpit. + +There are different ways to design and refactor the above system, for example, by removing the direct connection between the `Cockpit` and `Door` reactors. Our design choice is meant to highlight that door _disarming_ and _opening_ are two different and independent remote services triggered by two different commands issued by two different system actors. Therefore, each actor has an independent connection to the door to request its service. + +The purpose of the system is to open the door in reaction to the command from the cockpit whether or not a ramp is present. If a ramp is present, it is imperative that the door be disarmed _before_ being opened. Hence, the door, upon receiving the `open` command from the cockpit, should wait for input from the camera before opening. + +The order in which messages are processed is crucial in this application. When the _disarm_ and _open_ commands arrive with the same tag, the _disarm_ service needs to be invoked before opening the door, otherwise the escape slide will be erroneously deployed. +Lingua Franca guarantees determinism in the execution order of reactions with logically simultaneous inputs, and the order is given by the the order of declaration of the reactions inside the reactor. It is then sufficient to declare the `disarm` reaction _before_ the `open` one. The diagram confirms the execution order by labeling the `disarm` reaction with 1 and the `open` reaction with 2. + +The problem is that even though the messages are _logically_ simultaneous, they do not arrive at the same _physical_ time. In fact, the `open` command from the cockpit is likely to arrive before the clearance from the camera because the camera realizes an expensive computer vision algorithm. The door, consequently, has to wait for both inputs before invoking the _opening_ service. + +This is an example of an application that cannot safely proceed without assurance on its inputs. The following section explains how to obtain the desired behavior in Lingua Franca using the decentralized coordinator (the centralized coordinator automatically provides the required assurance). + +### Consistency with decentralized coordination +The application is implemented as a federated program with decentralized coordination, which means that the advancement of logical time in each single federate is not subject to approval from any centralized entities, but it is done locally based on the input it receives from the other federates and on its local physical clock. + +Let us consider the case when the `Door` reactor receives the _open_ command from the `Cockpit` reactor, but not yet the _disarm_ command from the `Camera` reactor. As previously observed, the `Door` cannot proceed to invoke the _opening_ service, because it needs to wait for the `Camera` to send the _disarm_ command. +But how long should it wait? + +The decentralized coordinator in +Lingua Franca allows you to customize this waiting time. Each federate can be assigned an attribute called [`maxwait`](/docs/writing-reactors/distributed-execution#safe-to-advance-sta) that controls how long the federate should wait for inputs from other federates before processing an event, such as an input it has just received. +More precisely, `maxwait` is the maximum amount of time a federate waits before advancing its logical time to some value _t_. Specifically, to advance to logical time _t_, the federate waits until either all inputs are known up to an including time _t_ or its local physical clock exceeds _t_ +`maxwait`. +An input is known up to an including time _t_ if a message with timestamp _t_ or greater has been received on that input port. +At the expiration of the `maxwait`, the federate assumes that any unresolved ports will not receive any messages with timestamps _t_ or earlier. +It can then advance its logical time to _t_. + +In our example, we want the door to _wait indefinitely_ for both _disarm_ and _open_ commands to arrive before processing any of them. In Lingua Franca, this is obtained by setting `maxwait` to `forever`. The `Door` reactor cannot safely proceed without assurance about the inputs. + +The implementation of the `Door` reactor and its instantiation are shown below: + +```lf-c +reactor Door { + input open: bool + input disarm: bool + state isDisarmed: bool = false + state isOpen: bool = false + + reaction(disarm) {= + if (disarm->value) { + self->isDisarmed = true; + printf("Door disarmed\n"); + } else { + self->isDisarmed = false; + printf("Door armed\n"); + } + =} tardy {= + printf("STP violation\n"); + printf("Intended tag: %lld\n", disarm->intended_tag); + printf("Current tag: %lld\n", lf_time_logical()); + =} + + reaction(open) {= + if (open->value) { + self->open = true; + printf("Door open\n"); + } else { + self->open = false; + printf("Door closed\n"); + } + =} tardy {= + printf("STP violation\n"); + printf("Intended tag: %lld\n", disarm->intended_tag); + printf("Current tag: %lld\n", lf_time_logical()); + =} +} + +federated reactor { + c = new Cockpit() + v = new Camera() + + @maxwait(forever) + d = new Door() + + c.open -> d.open + c.open -> v.check_ramp + v.ramp_present -> d.disarm +} +``` + +The `maxwait` attribute is specified at instantiation time within the main reactor. Right before creating the instance of the `Door` reactor for which we want to set the attribute, we use the `@maxwait` annotation that takes as input the `maxwait` value. + +The reactions of the `Door` reactor provide [fault handlers](/docs/writing-reactors/distributed-execution#safe-to-process-stp-violation-handling) that are invoked in case the federate assumed inputs were known up to timestamp _t_ and then later received a message with timestamp _t_ or less. When `maxwait` is `forever`, these fault handlers should never be invoked. + +For finite values of `maxwait`, it is always possible for messages to get sufficiently delayed that the fault handlers will be invoked. +When they are invoked, the current tag will be greater than the intended tag of the message. +This type of fault is called a **safe-to-process** (**STP**) violation because messages are being handled out of tag order. +The intended tag of the input can be accessed as shown in the code above. + +## Multirate inputs: automatic emergency braking +![AutomaticEmergencyBrakingSystem diagram](../static/img/blog/AutomaticEmergencyBrakingSystem.svg) + +Consider the above Lingua Franca implementation of an automatic emergency braking system, one of the most critical ADAS systems that modern cars are equipped with. +The controller system modeled by the `AutomaticEmergencyBraking` reactor reads data coming from two sensors, a lidar and a radar, and uses both to detect objects or pedestrians that cross the trajectory the car. +This is a _sensor fusion_ problem, where a diversity of sensors is used to get better reliability. +When one of the two sensors signals the presence of an object at a distance shorter than a configurable threshold, the controller triggers the brake to stop the car and avoid crashing into it. + +The sensors are modeled with their own timer that triggers the generation of data. The clocks of all federates are automatically synchronized by the [clock synchronization algorithm](/docs/writing-reactors/distributed-execution#clock-synchronization) of the Lingua Franca runtime (unless this is disabled). +Typically, in a real use case of this kind, the clock of sensor devices cannot be controlled by Lingua Franca, but a way to work around this limitation is to resample the data collected by sensors with the timing given by a clock that the runtime can control. +The sensor reactors of our application are then modeling this resampling of sensor data so that alignment of data from the two sensors is well defined and sensor fusion becomes possible. + +The lidar sensor has a sampling frequency that is twice that of the radar, as indicated by the timers in the corresponding reactors; the lidar timer has a period of 50ms, while that of the radar 100ms. +Their deadline is equal to their period and is enforced using the dedicated `DeadlineCheck` reactors, following the guidelines of how to [work with deadlines](/blog/deadlines). + +The sensor behavior in the application can be simulated for testing purposes in a way that each sensor constantly produces distance values above the threshold (i.e., no objects in the way), and then at a random time it sends a distance value below the threshold, indicating the presence of a close object. When the `AutomaticEmergencyBraking` reactor receives that message, it signals the `BrakingSystem` reactor to brake the car, and the whole system shuts down. + +### Desired system properties +Availability is a crucial property of this application, because we want the automatic emergency braking system to brake as fast as possible when a close object is detected. Consistency is also necessary, as sensor fusion happens with sensor data produced at the same logical time. Even if this is not implemented in our simplified example, sensor fusion in a more general scenario helps rule out false positives, i.e., cases in which one of the sensors erroneously detects a close object that would induce an unnecessary and dangerous braking. False positives are caused by the weaknesses of the specific sensor. For example, rainy or foggy weather reduces the accuracy of lidar sensors. The key concept is to gather data produced at the same logical time by all sensors and combine them to have a more accurate estimate of possible collisions. Consistency and in-order data processing are then required. + +#### Consistency challenge +The application is once agin implemented as a federated program with decentralized coordination. +Consistency problems may arise when a federate receives data from two or more federates, as it is the case of the `AutomaticEmergencyBraking` reactor. +The controller expects to receive input from both sensors at times 0ms, 100ms, 200ms, etc. Let's consider as an example the case where the remote connection between the controller and the radar has a slightly larger delay than that between the controller and the lidar. The lidar input will then always arrive slightly earlier than the radar one. When the controller receives the lidar input, should it process the data immediately, or should it wait for the radar input to come? Sensor fusion requires consistency: if the controller processes the input from the lidar and then the radar data comes, the control action elaborated upon the arrival of the lidar data does not take into account both sensors, even though it should. Hence, in our use case, the `AutomaticEmergencyBraking` reactor needs to wait for both inputs before processing new data. + +In our application, if we aim to process all incoming data with the same logical time to realize sensor fusion, then we can set `maxwait = forever` to _wait indefinitely_ for the radar input before processing the radar. +Note that this might not be a good choice in this example because if a fault causes one of the sensors to stop sending messages, the ADAS system will stop working. +Hence, in practice, we will probably want a smaller value for `maxwait`, and we will want to add fault detection and mitigation to the application. +Fault handling will be addressed in a later blog. Here we assume no such faults. + +#### Availability challenge +Even without faults, however, setting `maxwait` to `forever` creates problems when only the lidar input is expected (50ms, 150ms, 250ms, etc): the controller cannot process that input until an input from the radar comes, because `maxwait` will never expire. For example, if the single lidar input comes at time 50ms, it has to wait until time 100ms before being processed. If that input was signaling the presence of a close object, the detection would be delayed by 50ms, which may potentially mean crashing into the object. The automatic emergency braking system must be available, otherwise it might not brake in time to avoid collisions. +The ideal `maxwait` value for maximum availability in the time instants with only the lidar input is 0, because if a single input is expected, no wait is necessary. + +Summing up, consistency for sensor fusion requires `maxwait = forever` when inputs from both sensors are expected (or some finite value for fault tolerance), while availability calls for `maxwait = 0` when only the lidar input is coming. The two values are at odds, and any value in between would mean sacrificing both properties at the same time. + +### Dynamic adjustment of `maxwait` +The knowledge of the timing properties of the application under analysis enables the _a priori_ determination of the time instants when both inputs are expected and those when only the lidar has new data available. +Lingua Franca allows to dynamically change the `maxwait` in the reaction body using the `lf_set_fed_maxwait` API, that takes as input parameter the new `maxwait` value to set. +This capability of the language permits the automatic emergency braking federate to: + +* start with `maxwait` statically set to `forever` (or some finite value for fault tolerance), because at time 0 (startup) both sensors produce data; +* set `maxwait` to 0 after processing both inputs with the same logical time, because the next data will be sent by the lidar only; +* set `maxwait` back to `forever` after processing the radar input alone, because the next data will be sent by both sensors. + +This dynamic solution guarantees both consistency and availability as long as lidar data arrives within 50 ms. +The implementation and the instantiation of the `AutomaticEmergencyBraking` reactor are shown below: + +```lf-c +reactor AutomaticEmergencyBraking { + input lidar_in: float + input radar_in: float + output brake: int + state n_invocs: int = 0 + + reaction (lidar_in, radar_in) -> brake {= + if (lidar_in->is_present && radar_in->is_present) { + if (sensor_fusion(lidar_in->value, radar_in->value) { + printf("Sensors detect a close object -> signaling braking\n"); + lf_set(brake, 1); + } + } else if (lidar_in->is_present) { + if (lidar_analysis(lidar_in->value)) { + printf("Lidar has detected close object -> signaling braking\n"); + lf_set(brake, 1); + } + } + + self->n_invocs++; + if (self->n_invocs % 2) { + lf_set_fed_maxwait(0); + } else { + lf_set_fed_maxwait(FOREVER); + } + =} tardy {= + printf("STP violation on AEB\n"); + =} deadline(100ms) {= + printf("AEB deadline violated\n"); + =} + + federated reactor { + lidar = new Lidar() + radar = new Radar() + + @maxwait(forever) + aeb = new AutomaticEmergencyBraking() + + brake = new BrakingSystem() + + lidar.lidar_data -> aeb.lidar_in + radar.radar_data -> aeb.radar_in + aeb.brake -> brake.signal + } +} +``` + +The `sensor_fusion()` function combines the data and returns `true` if braking is needed. +The `lidar_analysis()` function uses only lidar data to make a (presumably more conservative) decision. +The `n_invocs` integer state variable counts the number of times the reaction of the `AutomaticEmergencyBraking` reactor is invoked. This variable is used to determine how many inputs the reaction expects to see at the next invocation and set the `maxwait` accordingly. Even invocation numbers mean that the next reaction invocation will happen with both sensor inputs present, so `maxwait` is set to `forever`; with odd invocation numbers, the next reaction invocation will see new data from the lidar only, and `maxwait` is then set to 0. + +Clearly, detecting and handling faults would be needed in practical implementation. +This will be the topic of a subsequent blog. diff --git a/docs/assets/code/c/src/DecentralizedTimerAfter.lf b/docs/assets/code/c/src/DecentralizedTimerAfter.lf index 41d629cba..20180a31f 100644 --- a/docs/assets/code/c/src/DecentralizedTimerAfter.lf +++ b/docs/assets/code/c/src/DecentralizedTimerAfter.lf @@ -6,7 +6,7 @@ target C { import Count, Print from "Federated.lf" reactor PrintTimer extends Print { - timer t(0, 1 sec) + timer t(10 ms, 1 sec) reaction(t) {= lf_print("Timer ticked at (%lld, %d).", @@ -18,5 +18,5 @@ reactor PrintTimer extends Print { federated reactor { c = new Count() p = new PrintTimer() - c.out -> p.in after 10 msec + c.out -> p.in after 10 ms } diff --git a/docs/assets/code/c/src/DecentralizedTimerAfterHandler.lf b/docs/assets/code/c/src/DecentralizedTimerHandler.lf similarity index 93% rename from docs/assets/code/c/src/DecentralizedTimerAfterHandler.lf rename to docs/assets/code/c/src/DecentralizedTimerHandler.lf index 56016919b..0d3d0e2ba 100644 --- a/docs/assets/code/c/src/DecentralizedTimerAfterHandler.lf +++ b/docs/assets/code/c/src/DecentralizedTimerHandler.lf @@ -13,7 +13,7 @@ reactor PrintTimer { lf_print("Received: %d at (%lld, %d)", in->value, lf_time_logical_elapsed(), lf_tag().microstep ); - =} STAA(0) {= + =} tardy {= lf_print("****** Violation handler invoked at (%lld, %d). " "Intended tag was (%lld, %d).", lf_time_logical_elapsed(), lf_tag().microstep, @@ -30,6 +30,7 @@ reactor PrintTimer { federated reactor { c = new Count() + @maxwait(10 ms) p = new PrintTimer() - c.out -> p.in after 10 msec + c.out -> p.in } diff --git a/docs/assets/code/c/src/DecentralizedTimerSTA.lf b/docs/assets/code/c/src/DecentralizedTimerSTA.lf index cc64a1f5a..4f5b961f6 100644 --- a/docs/assets/code/c/src/DecentralizedTimerSTA.lf +++ b/docs/assets/code/c/src/DecentralizedTimerSTA.lf @@ -5,7 +5,7 @@ target C { import Count, Print from "Federated.lf" -reactor PrintTimer(STA: time = 10 ms) extends Print { +reactor PrintTimer extends Print { timer t(0, 1 sec) reaction(t) {= @@ -17,6 +17,7 @@ reactor PrintTimer(STA: time = 10 ms) extends Print { federated reactor { c = new Count() + @maxwait(10 ms) p = new PrintTimer() c.out -> p.in } diff --git a/docs/assets/code/c/src/DecentralizedFeedbackSTAA.lf b/docs/assets/code/c/src/DecentralizedZeroDelayLoop.lf similarity index 81% rename from docs/assets/code/c/src/DecentralizedFeedbackSTAA.lf rename to docs/assets/code/c/src/DecentralizedZeroDelayLoop.lf index 1dd9aab37..0d08e10d2 100644 --- a/docs/assets/code/c/src/DecentralizedFeedbackSTAA.lf +++ b/docs/assets/code/c/src/DecentralizedZeroDelayLoop.lf @@ -1,4 +1,5 @@ target C { + timeout: 3 s, coordination: decentralized } @@ -16,14 +17,14 @@ reactor CountPrint { lf_print("***** CountPrint Received: %d at tag (%lld, %u)", in->value, lf_time_logical_elapsed(), lf_tag().microstep ); - =} STAA(forever) {= - // This should never happen, but it is here to demonstrate the STAA violation handler. - lf_print_warning("CountPrint: Safe to process violation!"); + =} tardy {= + // This should never happen, but it is here to demonstrate the tardy violation handler. + lf_print_warning("CountPrint: Message is tardy!"); lf_print("Intended time: %lld", in->intended_tag.time - lf_time_start()); =} } -reactor Double(STA: time = forever) { +reactor Double { input in: int output out: int @@ -36,5 +37,6 @@ federated reactor { c = new CountPrint() p = new Double() c.out -> p.in + @absent_after(forever) p.out -> c.in } diff --git a/docs/assets/code/c/src/DecentralizedZeroDelayLoopWithChecker.lf b/docs/assets/code/c/src/DecentralizedZeroDelayLoopWithChecker.lf new file mode 100644 index 000000000..f4273acbe --- /dev/null +++ b/docs/assets/code/c/src/DecentralizedZeroDelayLoopWithChecker.lf @@ -0,0 +1,28 @@ +target C { + timeout: 3 s, + coordination: decentralized +} + +import CountPrint, Double from "DecentralizedZeroDelayLoop.lf" +reactor CountPrintWithChecker extends CountPrint { + + reaction(t, in) {= + if (!in->is_present) { + lf_print("***** CountPrint Failed to Receive response at tag (%lld, %u)", + lf_time_logical_elapsed(), lf_tag().microstep + ); + } + =} tardy {= + lf_print("***** CountPrint Received tardy input: %d at tag (%lld, %u)", + in->value, lf_time_logical_elapsed(), lf_tag().microstep + ); + =} +} + +federated reactor { + c = new CountPrintWithChecker() + p = new Double() + c.out -> p.in + @absent_after(20 ms) + p.out -> c.in +} diff --git a/docs/assets/code/py/src/DecentralizedTimerAfter.lf b/docs/assets/code/py/src/DecentralizedTimerAfter.lf index d83b54f42..0d5b15ca0 100644 --- a/docs/assets/code/py/src/DecentralizedTimerAfter.lf +++ b/docs/assets/code/py/src/DecentralizedTimerAfter.lf @@ -6,7 +6,7 @@ target Python { import Count, Print from "Federated.lf" reactor PrintTimer extends Print { - timer t(0, 1 sec) + timer t(10 ms, 1 sec) reaction(t) {= print( @@ -19,5 +19,5 @@ reactor PrintTimer extends Print { federated reactor { c = new Count() p = new PrintTimer() - c.out -> p.inp after 10 msec + c.out -> p.inp after 10 ms } diff --git a/docs/assets/code/py/src/DecentralizedTimerAfterHandler.lf b/docs/assets/code/py/src/DecentralizedTimerHandler.lf similarity index 93% rename from docs/assets/code/py/src/DecentralizedTimerAfterHandler.lf rename to docs/assets/code/py/src/DecentralizedTimerHandler.lf index 767d53319..01981fac2 100644 --- a/docs/assets/code/py/src/DecentralizedTimerAfterHandler.lf +++ b/docs/assets/code/py/src/DecentralizedTimerHandler.lf @@ -14,7 +14,7 @@ reactor PrintTimer { f"Received: {inp.value} " f"at ({lf.time.logical_elapsed()}, {lf.tag().microstep})" ) - =} STAA(0) {= + =} tardy {= print( "****** Violation handler invoked at " f"({lf.time.logical_elapsed()}, {lf.tag().microstep}). " @@ -33,6 +33,7 @@ reactor PrintTimer { federated reactor { c = new Count() + @maxwait(10 ms) p = new PrintTimer() - c.out -> p.inp after 10 msec + c.out -> p.inp } diff --git a/docs/assets/code/py/src/DecentralizedTimerSTA.lf b/docs/assets/code/py/src/DecentralizedTimerSTA.lf index a5b90062b..e923108e3 100644 --- a/docs/assets/code/py/src/DecentralizedTimerSTA.lf +++ b/docs/assets/code/py/src/DecentralizedTimerSTA.lf @@ -5,7 +5,7 @@ target Python { import Count, Print from "Federated.lf" -reactor PrintTimer(STA = 10 ms) extends Print { +reactor PrintTimer extends Print { timer t(0, 1 sec) reaction(t) {= @@ -18,6 +18,7 @@ reactor PrintTimer(STA = 10 ms) extends Print { federated reactor { c = new Count() + @maxwait(10 ms) p = new PrintTimer() c.out -> p.inp } diff --git a/docs/assets/code/py/src/DecentralizedFeedbackSTAA.lf b/docs/assets/code/py/src/DecentralizedZeroDelayLoop.lf similarity index 91% rename from docs/assets/code/py/src/DecentralizedFeedbackSTAA.lf rename to docs/assets/code/py/src/DecentralizedZeroDelayLoop.lf index d98f7c562..23b4180b8 100644 --- a/docs/assets/code/py/src/DecentralizedFeedbackSTAA.lf +++ b/docs/assets/code/py/src/DecentralizedZeroDelayLoop.lf @@ -1,4 +1,5 @@ target Python { + timeout: 3 s, coordination: decentralized } @@ -15,14 +16,14 @@ reactor CountPrint { reaction(inp) {= print(f"***** CountPrint Received: {inp.value} at tag ({lf.time.logical_elapsed()}, {lf.tag().microstep})") - =} STAA(forever) {= + =} tardy {= # This should never happen, but it is here to demonstrate the STAA violation handler. print("CountPrint: Safe to process violation!") print(f"Intended time: {inp.intended_tag.time - lf.time.start()}") =} } -reactor Double(STA = forever) { +reactor Double { input inp output out @@ -35,5 +36,6 @@ federated reactor { c = new CountPrint() p = new Double() c.out -> p.inp + @absent_after(forever) p.out -> c.inp } \ No newline at end of file diff --git a/docs/assets/code/py/src/DecentralizedZeroDelayLoopWithChecker.lf b/docs/assets/code/py/src/DecentralizedZeroDelayLoopWithChecker.lf new file mode 100644 index 000000000..1a9ad2dd1 --- /dev/null +++ b/docs/assets/code/py/src/DecentralizedZeroDelayLoopWithChecker.lf @@ -0,0 +1,23 @@ +target Python { + timeout: 3 s, + coordination: decentralized +} + +import CountPrint, Double from "DecentralizedZeroDelayLoop.lf" +reactor CountPrintWithChecker extends CountPrint { + + reaction(t, inp) {= + if (not inp.is_present) : + print(f"***** CountPrint Failed to Receive response at tag ({lf.time.logical_elapsed()}, {lf.tag().microstep})") + =} tardy {= + print(f"***** CountPrint Received tardy input: {inp.value} at tag ({lf.time.logical_elapsed()}, {lf.tag().microstep})") + =} +} + +federated reactor { + c = new CountPrintWithChecker() + p = new Double() + c.out -> p.inp + @absent_after(20 ms) + p.out -> c.inp +} diff --git a/docs/assets/images/diagrams/DecentralizedZeroDelayLoop.svg b/docs/assets/images/diagrams/DecentralizedZeroDelayLoop.svg new file mode 100644 index 000000000..44d651e0d --- /dev/null +++ b/docs/assets/images/diagrams/DecentralizedZeroDelayLoop.svg @@ -0,0 +1 @@ +DecentralizedZeroDelayLoopCountPrint(0, 100 ms)12inoutDoubleinout \ No newline at end of file diff --git a/docs/assets/images/diagrams/DecentralizedZeroDelayLoopWithChecker.svg b/docs/assets/images/diagrams/DecentralizedZeroDelayLoopWithChecker.svg new file mode 100644 index 000000000..63ad1278a --- /dev/null +++ b/docs/assets/images/diagrams/DecentralizedZeroDelayLoopWithChecker.svg @@ -0,0 +1 @@ +DecentralizedZeroDelayLoopWithCheckerCountPrintWithChecker(0, 100 ms)123$inoutDoubleinout \ No newline at end of file diff --git a/docs/writing-reactors/distributed-execution.mdx b/docs/writing-reactors/distributed-execution.mdx index 600f4ba9f..b5f37f92c 100644 --- a/docs/writing-reactors/distributed-execution.mdx +++ b/docs/writing-reactors/distributed-execution.mdx @@ -198,11 +198,15 @@ An alternative is **decentralized** coordination, which extends a technique real With decentralized coordination, the RTI coordinates startup, shutdown, and clock synchronization, but is otherwise not involved in the execution of the distributed program. -### Safe-to-Advance (STA) +### Maxwait -In decentralized coordination, when one federate communicates with another, it does so directly through a dedicated socket without going through the RTI. Moreover, it does not consult the RTI to advance logical time. Instead, each federate has a **safe-to-advance** (**STA**) offset, and it can advance its logical time to _t_ when its physical clock matches or exceeds _t_ + STA. +In decentralized coordination, when one federate communicates with another, it does so directly through a dedicated socket without going through the RTI. Moreover, it does not consult the RTI to advance logical time. Instead, each federate has a **maxwait** attribute (formerly called **safe-to-advance**, or **STA**). +The federate can advance to tag _g_ when either of the following conditions is satisfied: -By default, the STA is zero. An STA of zero is OK for any federate where either _every_ logical connection into the federate has a sufficiently large `after` clause, or the federate has only one upstream federate sending it messages, and it has no local timers or actions. The value of the `after` delay on each connection must exceed the sum of the [clock synchronization](#clock-synchronization) error _E_, a bound _L_ on the network latency, and the time lag on the sender _D_ (the physical time at which it sends the message minus the timestamp of the message). The sender's time lag _D_ can be enforced by using a `deadline`. For example: +1. All inputs are known up to and including tag _g_. An input is known up to and including tag _g_ if a message with tag _g_ or greater has been received on that input port. +2. The federate's physical clock matches or exceeds _t_ + maxwait, where _t_ is the timestamp in _g_ = (_t_, _m_). + +By default, the maxwait is zero. A maxwait of zero is OK for any federate where either _every_ logical connection into the federate has a sufficiently large `after` clause, or the federate has only one upstream federate sending it messages, and it has no local timers or actions. The value of the `after` delay on each connection must exceed the sum of the [clock synchronization](#clock-synchronization) error _E_, a bound _L_ on the network latency, and the time lag on the sender _D_ (the physical time at which it sends the message minus the timestamp of the message). The sender's time lag _D_ can be enforced by using a `deadline`. For example: import C_DecentralizedTimerAfter from '../assets/code/c/src/DecentralizedTimerAfter.lf'; import Py_DecentralizedTimerAfter from '../assets/code/py/src/DecentralizedTimerAfter.lf'; @@ -210,80 +214,113 @@ import Py_DecentralizedTimerAfter from '../assets/code/py/src/DecentralizedTimer This example inherits from the Federated example above. -In this example, as long as the messages from federate `c` arrive at federate `p` within 10 ms, all messages will be processed in tag order, as with an unfederated program. +The messages from federate `c` are sent at logical times 0, 1s, 2s, etc., but arrive with logical times 0.01s, 1.01s, 2.01s, etc. +The timer in the destination `PrintTimer` reactor has an offset of 10ms, so it will be wanting to advance its logical time to those same values. +As long as the messages from federate `c` arrive before the destination reactor commits to those values, they will be processed simultaneously with the timer events. -If a message takes longer than 10 ms to arrive, then it is possible for a **safe-to-process (STP) violation** to occur. Specifically, in this case, the receiving federate `p` may advance its logical time to one of the timer tick times and then later receive an input message with a timestamp that is less than this advanced time. With the above program, this will cause a warning to be printed; the message will be processed at the earliest possible logical time, typically one microstep after the latest timer tick. +Because of the `after` delay, the runtime system has approximately 10 ms in physical time to accomplish the delivery of the message. If you reduce the `after` delay and the timer offset to, say, 10us, then you will see the warnings. +If a message takes longer than 10 ms to arrive, then it is possible for a **safe-to-process (STP) violation** to occur. The message is said to be **tardy**. In this case, the receiving federate `p` may advance its logical time to one of the timer tick times and then later receive an input message with a timestamp that is less than or equal to this advanced time. With the above program, this will cause a warning to be printed; the message will be processed at the earliest possible logical time, typically one microstep after the latest timer tick. We will show below how to catch this violation and handle it. -An alternative to the `after` delays is to add an **STA offset** to downstream federates, as in the following example: +An alternative to the `after` delays is to add a **maxwait** to downstream federates, as in the following example: import C_DecentralizedTimerSTA from '../assets/code/c/src/DecentralizedTimerSTA.lf'; import Py_DecentralizedTimerSTA from '../assets/code/py/src/DecentralizedTimerSTA.lf'; -Here, a parameter named `STA` gives a time value, and the federate waits this specified amount of time (physical time) beyond a logical time _t_ before advancing its logical time to _t_. In the above example, reactions to the timer events will be delayed by the amount specified by the `STA` parameter at most Just as with the use of `after`, if the `STA` exceeds the sum of network latency, clock synchronization error, and execution times, then all events will be processed in tag order, and no STP violation will occur. +Note that now there is no `after` delay and the timer has offset zero. +The attribute `@maxwait(10 ms)` gives a time value, and the federate waits up to this specified amount of time (in physical time) beyond a logical time _t_ before advancing its tag to _g_ = (_t_, _m_). +In the above example, when the destination `PrintTimer` reactor wants to advance its logical time to 0, 1s, 2s, etc., it will wait at most for its physical clock to exceed these values plus the `maxwait` value. +If the input arrives before the wait expires, then it proceeds to commit to the specified logical time and invoke its two reactions (the one inherited from `Print` first followed by the one defined locally). +If the wait expires before the input arrives, it will assume the input is absent and invoke only the local reaction. +When the input later arrives (it is tardy), it will experience an STP violation. +If you change the `maxwait` value to, say, 1us, you will likely see such violations. +Just as with the use of `after`, if the `maxwait` exceeds the sum of network latency, clock synchronization error, and execution times, then all events will be processed in tag order, and no messages will be tardy. + +Notice that the choice of whether to use an `after` delay or a `maxwait` is important. +If the alignment of the remote data with whatever the timer reaction does is important, then you should use `maxwait`. +This choice emphasizes consistency between the sending and the receiving federate. +If, on the other hand, it is important for the receiving federate to quickly handle the timer events, then you should use the `after` delay. +This choice emphasizes availability. +The consistency-availability tradeoff is fundamental to distributed systems, as explained by [Lee, et al.](https://doi.org/10.1145/3609119) -### Safe-to-Process (STP) Violation Handling +### Tardy Message Handling -Of course, the assumptions about network latency, etc., can be violated in practice. Analogous to a deadline violation, Lingua Franca provides a mechanism for handling such a violation by providing a violation handler. The pattern is: +Any assumptions about bounded communication latency can be violated in practice. Analogous to a deadline violation, Lingua Franca provides a mechanism for handling such a violation by providing a violation handler. The pattern is: ```lf reaction(in) {= // User code -=} STAA (0) {= +=} tardy {= // STP violation handling code =} ``` -If the tag at which this reaction is to be invoked (the value returned by `lf_tag()`) exceeds the tag of an incoming message on input `in` (the current tag has already advanced beyond the intended tag of `in`), then the STP violation handler will be invoked instead of the normal reaction. Within the body of the violation handler, the code can access the intended tag of `in` using `in->intended_tag` (or `inp.intended_tag` in Python), which has two fields, a timestamp `in->intended_tag.time` and a microstep `in->intended_tag.microstep`. The code can then ascertain the severity of the error and act accordingly. For example: +If the tag at which this reaction is to be invoked (the value returned by `lf_tag()`) exceeds the tag of an incoming message on input `in` (the current tag has already advanced beyond the intended tag of `in`), then the tardy handler will be invoked instead of the normal reaction. Within the body of the violation handler, the code can access the intended tag of `in` using `in->intended_tag` (or `inp.intended_tag` in Python), which has two fields, a timestamp `in->intended_tag.time` and a microstep `in->intended_tag.microstep`. The code can then ascertain the severity of the error and act accordingly. For example: + +import C_DecentralizedTimerHandler from '../assets/code/c/src/DecentralizedTimerHandler.lf'; +import Py_DecentralizedTimerHandler from '../assets/code/py/src/DecentralizedTimerHandler.lf'; + + + +If you change the `maxwait` to, say, 10 us, you will likely see the handler getting invoked. + +### Zero Delay Cycles + +It is not uncommon in programs to implement a pattern where one federate produces an output to request a service from another federate and expects a response at the same tag. +This creates a zero-delay loop, as shown in the following diagram: + +import DecentralizedZeroDelayLoopSVG from "./../assets/images/diagrams/DecentralizedZeroDelayLoop.svg" -import C_DecentralizedTimerAfterHandler from '../assets/code/c/src/DecentralizedTimerAfterHandler.lf'; -import Py_DecentralizedTimerAfterHandler from '../assets/code/py/src/DecentralizedTimerAfterHandler.lf'; + - +Here, the `CountPrint` reactor, on each timer tick, requests a service from the `Double` reactor. +It then handles the response from `Double` in its second reaction. +It would not work here to set a non-zero `maxwait` for `CountPrint` because this would delay reacting the timer tick, which would delay the output. +But then the `CountPrint` federate would immediately assume its input is absent. +When the response later arrives from `Double`, it will be tardy. -### Safe-to-Assume-Absent (STAA) +This problem is addressed as shown in the following source code for the diagram above: -Notice that the keyword `STAA` is used to indicate that the violation handler is to be invoked. -`STAA` stands for **safe-to-assume-absent** and has a more subtle meaning than the `STA`, **safe-to-advance**. -In the above example, the `STAA` is set to 0, which means that once the federate has advanced its logical time to a tag, it can immediately assume that any inputs to the federate that trigger or are used by the reaction are absent, unless they are already known to be present. -You can specify an `STAA` larger than 0, in which case the federate will wait until physical time exceeds _t_ + `STA` + `STAA` before assuming that the tiggering inputs are absent. +import C_DecentralizedZeroDelayLoop from '../assets/code/c/src/DecentralizedZeroDelayLoop.lf'; +import Py_DecentralizedZeroDelayLoop from '../assets/code/py/src/DecentralizedZeroDelayLoop.lf'; -In summary, when a federate wishes to advance its logical time to a tag _t_, it can do so if its physical clock matches or exceeds _t_ + `STA` or if all inputs become known for all tags less than _t_. -It does not need to know the status of the inputs **at* tag _t_, only before. -Once it commits to tag _t_, then it can assume that an input to the federate is absent when its physical clock matches or exceeds _t_ + `STA` + `STAA` (or the input becomes known at _t_ or greater). The value of `STAA` used in this calculation is the smallest `STAA` given by reactions that are either triggered by or use the input. + -Consider the following example: +The `CountPrint` federate uses the default `maxwait` of 0, meaning that it can advance its logical time to a tag _g_ = (_t_, _m_) when its physical clock matches or exceeds _t_. Hence, it will immediately advance to each timer tick at a rate of 10 Hz. Reaction 1 will then execute and produce an output, which will be sent to the `Double` federate. That federate also uses the default `maxwait` of zero, which works because the federate has no local events; it will not attempt to advance its tag until it receives an input. -import C_DecentralizedFeedbackSTAA from '../assets/code/c/src/DecentralizedFeedbackSTAA.lf'; -import Py_DecentralizedFeedbackSTAA from '../assets/code/py/src/DecentralizedFeedbackSTAA.lf'; +When the `Double` reaction executes, it will send a response to `CountPrint` that has the same tag as the output that was just produced by `CountPrint`. +The connection from `Double` to `CountPrint` has an attribute **absent_after** with value `forever`, as shown above. +This means that it will never assume that the input is absent. It will block, possibly forever, until an input with timestamp _t_ or greater arrives. Thus, this program is assured of behaving identically to its execution under centralized coordination, but with lower overhead (messages are sent directly rather than through the RTI). - +Note that the program will freeze if, for example, the `Double` federate fails. To guard against this, you could specify a smaller `absent_after` for reaction 2 in `CountPrint`. This could be used to detect the failure of the `Double` federate, for example by setting a state variable and then checking it in a third reaction triggered by the same timer. That reaction will not execute until either reaction 2 executes (an input has arrived) or the input was assumed to be absent (the `Double` federate likely has failed). +A realization of this strategy is shown below: -import DecentralizedFeedbackSTAA from "../assets/images/diagrams/DecentralizedFeedbackSTAA.svg" +import DecentralizedZeroDelayLoopWithCheckerSVG from "./../assets/images/diagrams/DecentralizedZeroDelayLoopWithChecker.svg" - + -The `CountPrint` federate uses the default `STA` of 0, meaning that it can advance its logical time to a tag _t_ when its physical clock matches or exceeds _t_. Hence, it will immediately advance to each timer tick at a rate of 10 Hz. Reaction 1 will then execute and produce an output, which will be sent to the `Double` federate. That federate has an `STA` of `forever`, meaning that it will not avance to any logical time _t_ until it has received an input with tag _t_ or greater. In this case, the federate has no local events, so an `STA` of 0 would also work fine. In fact, any `STA` will work. -When the `Double` reaction executes, it will send a response to `CountPrint` that has the same tag as the output that was just produced by `CountPrint`. Reaction 2 in `CountPrint` has an `STAA` of `forever`, which means that it will never assume that the input is absent. It will block, possibly forever, until an input with tag _t_ or greater arrives. Thus, this program is assured of behaving identically to its execution under centralized coordination, but with lower overhead (messages are sent directly rather than through the RTI). +import C_DecentralizedZeroDelayLoopWithChecker from '../assets/code/c/src/DecentralizedZeroDelayLoopWithChecker.lf'; +import Py_DecentralizedZeroDelayLoopWithChecker from '../assets/code/py/src/DecentralizedZeroDelayLoopWithChecker.lf'; -Note that the program will freeze if, for example, the `Double` federate fails. To guard against this, you could specify a smaller `STAA` for reaction 2 in `CountPrint`. This could be used to detect the failure of the `Double` federate, for example by setting a state variable and then checking it in a third reaction triggered by the same timer. That reaction will not execute until either reaction 2 executes (an input has arrived) or the input was assumed to be absent (the `Double` federate likely has failed). + -`STAA` and `STA` are rather sophisticated variables. See [the research papers](https://www.lf-lang.org/research) for guidance. -For example, the paper [Consistency vs. Availability in Distributed Cyber-Physical Systems](https://dl.acm.org/doi/10.1145/3609119) provides a foundational theory and fundamental tradeoffs that can be tuned using `STA` and `STAA`. -The paper [Logical Time in Actor Systems](https://eecs.berkeley.edu/~eal/publications/LeeTimeAghaFestschriftPreprint2025.pdf) provides a more detailed explanation of the theory and implementation of `STA` and `STAA`. +Decentralized coordination with `maxwait` and `absent_after` is a rather sophisticated tool. See [the research papers](https://www.lf-lang.org/research) for guidance. +For example, the paper [Consistency vs. Availability in Distributed Cyber-Physical Systems](https://dl.acm.org/doi/10.1145/3609119) provides a foundational theory and fundamental tradeoffs (it uses terminology `STA` for `maxwait` and `STAA` for `absent_after`). +The paper [Logical Time in Actor Systems](https://eecs.berkeley.edu/~eal/publications/LeeTimeAghaFestschriftPreprint2025.pdf) provides a more detailed explanation of the theory and implementation (again using terminology `STA` and `STAA`). -## Dynamically Adjusting the STA +## Dynamically Adjusting the `maxwait` -For more advanced users, the LF API provides two functions that can be used to dynamically adjust the STA: +For more advanced users, the LF API provides two functions that can be used to dynamically adjust the `maxwait`: ```c -interval_t lf_get_sta(); -void lf_set_sta(interval_t offset); +interval_t lf_get_fed_maxwait(); +void lf_set_fed_maxwait(interval_t offset); ``` Using these functions, however, is a pretty advanced operation. +See the blog post [Decentralized Consistency](/blog/decentralized-consistency) for guidance. ## Physical Connections @@ -307,7 +344,7 @@ then what does this mean? At the receiving end, the timestamp assigned to the in In the above example, all of the generated programs expect to run on localhost. This is the default. With these defaults, every federate has to run on the same machine as the RTI because localhost is not a host that is visible from other machines on the network. In order to run federates or the RTI on remote machines, you can specify a domain name or IP address for the RTI and/or federates. -In order for a federated execution to work, there is some setup required on the machines to be used. First, each machine must be running on `ssh` server. On a Linux machine, this is typically done with a command like this: +In order for a federated execution to work, there is some setup required on the machines to be used. First, each machine must be running an `ssh` server. On a Linux machine, this is typically done with a command like this: ```sh sudo systemctl ssh.service @@ -352,12 +389,12 @@ Address 0.0.0.0: The default host, `localhost` is used if no address is specifie A federate may be mapped to a particular remote machine using a syntax like this: ```lf - count = new Count() at user@host:port/path; + count = new Count() at user@host:port; ``` The `port` is ignored in **centralized** mode because all communication is routed through the RTI, but in **decentralized** mode it will specify the port on which a socket server listens for incoming connections from other federates. -If any federate has such a remote designator, then a `Federation_distribute.sh` shell script will be generated. This script will distribute the generated code for the RTI to the remote machine at the specified directory. +If any federate has such a remote designator, then a `Federation_distribute.sh` shell script will be generated. This script will distribute the generated code for the RTI and any federates to the remote machine at the specified directory. You can also specify a user name on the remote machine for cases where the username will not match whoever launches the federation: diff --git a/static/img/blog/AircraftDoor.svg b/static/img/blog/AircraftDoor.svg new file mode 100644 index 000000000..93eabc10e --- /dev/null +++ b/static/img/blog/AircraftDoor.svg @@ -0,0 +1 @@ +AircraftDoorCockpit12PopenCameracheck_rampramp_presentDoor12opendisarm$ \ No newline at end of file diff --git a/static/img/blog/AutomaticEmergencyBrakingSystem.svg b/static/img/blog/AutomaticEmergencyBrakingSystem.svg new file mode 100644 index 000000000..4a3d66443 --- /dev/null +++ b/static/img/blog/AutomaticEmergencyBrakingSystem.svg @@ -0,0 +1 @@ +AutomaticEmergencyBrakingSystemLidarLidarSensor(0 ms, 50 ms)1250 msdataDeadlineCheck50 msin_signalout_signallidar_dataRadarRadarSensor(0 ms, 100 ms)12100 msdataDeadlineCheck100 msin_signalout_signalradar_dataAutomaticEmergencyBraking100 mslidar_inradar_inbrakeBrakingSystem100 mssignal \ No newline at end of file