Skip to content

Commit d709632

Browse files
authored
feat: make shared signals use asynchronous signal trees (#23559)
* feat: make shared signals use asynchronous signal trees Shared signals (SharedValueSignal, SharedListSignal, SharedMapSignal, SharedNodeSignal) now use LocalAsynchronousSignalTree instead of SynchronousSignalTree. This aligns with future clustered implementations where confirmation happens asynchronously via an event log. Key changes: - Add LocalAsynchronousSignalTree that dispatches confirm() on Vaadin's service executor - Adjust confirm() to skip observer notification when confirmed commands were at the head of the unconfirmed queue (no state change) As a side effect, all signals in a transaction must be from the same tree. Fixes #23545
1 parent b9d4743 commit d709632

File tree

9 files changed

+201
-39
lines changed

9 files changed

+201
-39
lines changed

flow-server/src/main/java/com/vaadin/flow/signals/shared/SharedListSignal.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
import com.vaadin.flow.signals.function.TransactionTask;
3535
import com.vaadin.flow.signals.operations.InsertOperation;
3636
import com.vaadin.flow.signals.operations.SignalOperation;
37+
import com.vaadin.flow.signals.shared.impl.LocalAsynchronousSignalTree;
3738
import com.vaadin.flow.signals.shared.impl.SignalTree;
38-
import com.vaadin.flow.signals.shared.impl.SynchronousSignalTree;
3939

4040
/**
4141
* A signal containing a list of values. Supports atomic updates to the list
@@ -158,7 +158,7 @@ private static Id idOf(@Nullable AbstractSignal<?> signal) {
158158
* the element type, not <code>null</code>
159159
*/
160160
public SharedListSignal(Class<T> elementType) {
161-
this(new SynchronousSignalTree(false), Id.ZERO, ANYTHING_GOES,
161+
this(new LocalAsynchronousSignalTree(), Id.ZERO, ANYTHING_GOES,
162162
elementType);
163163
}
164164

flow-server/src/main/java/com/vaadin/flow/signals/shared/SharedMapSignal.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@
3737
import com.vaadin.flow.signals.operations.PutIfAbsentResult;
3838
import com.vaadin.flow.signals.operations.SignalOperation;
3939
import com.vaadin.flow.signals.shared.impl.CommandResult.NodeModification;
40+
import com.vaadin.flow.signals.shared.impl.LocalAsynchronousSignalTree;
4041
import com.vaadin.flow.signals.shared.impl.SignalTree;
41-
import com.vaadin.flow.signals.shared.impl.SynchronousSignalTree;
4242

4343
/**
4444
* A signal containing a map of values with string keys. Supports atomic updates
@@ -62,7 +62,7 @@ public class SharedMapSignal<T>
6262
* the element type, not <code>null</code>
6363
*/
6464
public SharedMapSignal(Class<T> elementType) {
65-
this(new SynchronousSignalTree(false), Id.ZERO, ANYTHING_GOES,
65+
this(new LocalAsynchronousSignalTree(), Id.ZERO, ANYTHING_GOES,
6666
elementType);
6767
}
6868

flow-server/src/main/java/com/vaadin/flow/signals/shared/SharedNodeSignal.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
import com.vaadin.flow.signals.operations.PutIfAbsentResult;
3636
import com.vaadin.flow.signals.operations.SignalOperation;
3737
import com.vaadin.flow.signals.shared.SharedListSignal.ListPosition;
38+
import com.vaadin.flow.signals.shared.impl.LocalAsynchronousSignalTree;
3839
import com.vaadin.flow.signals.shared.impl.SignalTree;
39-
import com.vaadin.flow.signals.shared.impl.SynchronousSignalTree;
4040

4141
/**
4242
* A signal representing a node in a tree structure. The {@link #get()} of a
@@ -142,7 +142,7 @@ public Map<String, SharedNodeSignal> mapChildren() {
142142
* node structure. The signal does not support clustering.
143143
*/
144144
public SharedNodeSignal() {
145-
this(new SynchronousSignalTree(false), Id.ZERO, ANYTHING_GOES);
145+
this(new LocalAsynchronousSignalTree(), Id.ZERO, ANYTHING_GOES);
146146
}
147147

148148
/**

flow-server/src/main/java/com/vaadin/flow/signals/shared/SharedValueSignal.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
import com.vaadin.flow.signals.impl.Transaction;
3535
import com.vaadin.flow.signals.operations.CancelableOperation;
3636
import com.vaadin.flow.signals.operations.SignalOperation;
37+
import com.vaadin.flow.signals.shared.impl.LocalAsynchronousSignalTree;
3738
import com.vaadin.flow.signals.shared.impl.SignalTree;
38-
import com.vaadin.flow.signals.shared.impl.SynchronousSignalTree;
3939

4040
/**
4141
* A signal containing a value. The value is updated as a single atomic change.
@@ -59,7 +59,7 @@ public class SharedValueSignal<T> extends AbstractSignal<T> {
5959
*/
6060
@SuppressWarnings("unchecked")
6161
public SharedValueSignal(T initialValue) {
62-
this(new SynchronousSignalTree(false), Id.ZERO, ANYTHING_GOES,
62+
this(new LocalAsynchronousSignalTree(), Id.ZERO, ANYTHING_GOES,
6363
(Class<T>) initialValue.getClass());
6464
set(initialValue);
6565
}
@@ -72,7 +72,7 @@ public SharedValueSignal(T initialValue) {
7272
* the value type, not <code>null</code>
7373
*/
7474
public SharedValueSignal(Class<T> valueType) {
75-
this(new SynchronousSignalTree(false), Id.ZERO, ANYTHING_GOES,
75+
this(new LocalAsynchronousSignalTree(), Id.ZERO, ANYTHING_GOES,
7676
Objects.requireNonNull(valueType));
7777
}
7878

flow-server/src/main/java/com/vaadin/flow/signals/shared/impl/AsynchronousSignalTree.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,24 @@ public void confirm(List<SignalCommand> commands) {
7373

7474
confirmed = new Snapshot(builder);
7575

76+
/*
77+
* Check if the confirmed commands were at the head of
78+
* unconfirmedCommands. If so, the same commands were already
79+
* applied during latency compensation with the same results, so
80+
* submitted doesn't change and we can skip re-applying and
81+
* notifying observers.
82+
*/
83+
boolean confirmedFromHead = wereAtHead(
84+
unconfirmedCommands.getCommands(), commands);
85+
7686
// Remove any pending commands that are now confirmed from the queue
7787
unconfirmedCommands.removeHandledCommands(results.keySet());
7888

7989
Snapshot oldSubmitted = submitted;
8090

81-
/*
82-
* TODO: could skip this part if the newly confirmed commands were
83-
* at the head of unconfirmedCommands since submitted doesn't change
84-
* in that case
85-
*/
86-
if (!unconfirmedCommands.isEmpty()) {
91+
if (confirmedFromHead) {
92+
// submitted doesn't change so no need to rebuild or notify
93+
} else if (!unconfirmedCommands.isEmpty()) {
8794
// Re-apply pending commands that remain in the queue
8895
builder.apply(unconfirmedCommands.getCommands());
8996

@@ -92,14 +99,50 @@ public void confirm(List<SignalCommand> commands) {
9299
submitted = confirmed;
93100
}
94101

95-
notifyObservers(oldSubmitted, submitted);
102+
if (!confirmedFromHead) {
103+
notifyObservers(oldSubmitted, submitted);
104+
}
96105

97106
unconfirmedCommands.notifyResultHandlers(results, commands);
98107

99108
notifyProcessedCommandSubscribers(commands, results);
100109
});
101110
}
102111

112+
/**
113+
* Checks whether all confirmed commands appear at the head of the
114+
* unconfirmed command list. This means the confirmed commands were the
115+
* first commands submitted to this tree and no reordering or external
116+
* commands were involved. Uses the top-level command list to determine the
117+
* count rather than the results key set, since results may also include
118+
* sub-command IDs from transaction commands.
119+
* <p>
120+
* When this returns {@code true} and there are additional unconfirmed
121+
* commands beyond the confirmed ones, submitted still doesn't need to be
122+
* rebuilt. If unconfirmed was [A, B, C] and confirmed is [A], then the
123+
* current submitted was built as {@code old_confirmed + A + B + C}. After
124+
* confirming A, the new confirmed is {@code old_confirmed + A}, so
125+
* rebuilding would give {@code (old_confirmed + A) + B + C} which is the
126+
* same as the current submitted.
127+
*/
128+
private static boolean wereAtHead(List<SignalCommand> unconfirmedCommands,
129+
List<SignalCommand> confirmedCommands) {
130+
int confirmedCount = confirmedCommands.size();
131+
if (confirmedCount == 0) {
132+
return true;
133+
}
134+
if (confirmedCount > unconfirmedCommands.size()) {
135+
return false;
136+
}
137+
for (int i = 0; i < confirmedCount; i++) {
138+
if (!unconfirmedCommands.get(i).commandId()
139+
.equals(confirmedCommands.get(i).commandId())) {
140+
return false;
141+
}
142+
}
143+
return true;
144+
}
145+
103146
@Override
104147
public PendingCommit prepareCommit(CommandsAndHandlers changes) {
105148
assert hasLock();
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2000-2026 Vaadin Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.vaadin.flow.signals.shared.impl;
17+
18+
import java.util.List;
19+
20+
import com.vaadin.flow.signals.SignalCommand;
21+
import com.vaadin.flow.signals.SignalEnvironment;
22+
23+
/**
24+
* An asynchronous signal tree for single-JVM use that dispatches
25+
* {@link #confirm(List)} using the default effect dispatcher from
26+
* {@link SignalEnvironment}. This makes the behavior consistent with clustered
27+
* implementations where confirmation happens asynchronously.
28+
*/
29+
public class LocalAsynchronousSignalTree extends AsynchronousSignalTree {
30+
31+
@Override
32+
protected void submit(List<SignalCommand> commands) {
33+
SignalEnvironment.getDefaultEffectDispatcher()
34+
.execute(() -> confirm(commands));
35+
}
36+
}

flow-server/src/test/java/com/vaadin/flow/signals/impl/ComputedSignalTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ void transaction_readInCommittedTransaction_notCoumptedAgainAfterTransaction() {
299299
}
300300

301301
@Test
302-
void transaction_readInAbortedTransaction_notCoumptedAgainAfterTransaction() {
302+
void transaction_readInAbortedTransaction_valueRestoredAfterRejection() {
303303
SharedValueSignal<String> source = new SharedValueSignal<>("value");
304304
AtomicInteger count = new AtomicInteger();
305305

@@ -308,20 +308,26 @@ void transaction_readInAbortedTransaction_notCoumptedAgainAfterTransaction() {
308308
return source.get();
309309
});
310310

311-
signal.get();
311+
assertEquals("value", signal.get());
312312
assertEquals(1, count.get());
313313

314314
Transaction.runInTransaction(() -> {
315315
source.set("update");
316316

317-
signal.get();
317+
assertEquals("update", signal.get());
318318
assertEquals(2, count.get());
319319

320320
source.verifyValue("other");
321321
});
322322

323-
signal.get();
324-
assertEquals(2, count.get());
323+
/*
324+
* Count is 3 because the computed signal's dependency was captured with
325+
* the in-transaction value ("update"). After the rejected transaction,
326+
* the submitted value is still "value", which differs from the captured
327+
* value, so the computed signal must recompute.
328+
*/
329+
assertEquals("value", signal.get());
330+
assertEquals(3, count.get());
325331
}
326332

327333
@Test

flow-server/src/test/java/com/vaadin/flow/signals/impl/EffectTest.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -194,24 +194,6 @@ void changeTracking_multipleWritesInTransaction_effectRunOnce() {
194194
assertEquals(List.of("", "second"), invocations);
195195
}
196196

197-
@Test
198-
void changeTracking_multipleSignalsInTransaction_effectRunOnce() {
199-
SharedValueSignal<String> signal1 = new SharedValueSignal<>("");
200-
SharedValueSignal<String> signal2 = new SharedValueSignal<>("");
201-
ArrayList<String> invocations = new ArrayList<>();
202-
203-
Signal.unboundEffect(() -> {
204-
invocations.add(signal1.get() + signal2.get());
205-
});
206-
207-
Signal.runInTransaction(() -> {
208-
signal1.set("one ");
209-
signal2.set("two");
210-
});
211-
212-
assertEquals(List.of("", "one two"), invocations);
213-
}
214-
215197
@Test
216198
void changeTracking_changeOtherPartOfNode_effectNotRunAgain() {
217199
SharedValueSignal<String> signal = new SharedValueSignal<>("value");
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2000-2026 Vaadin Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.vaadin.flow.signals.shared.impl;
17+
18+
import java.util.concurrent.atomic.AtomicReference;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import com.vaadin.flow.signals.SignalCommand;
23+
import com.vaadin.flow.signals.SignalTestBase;
24+
import com.vaadin.flow.signals.TestUtil;
25+
import com.vaadin.flow.signals.shared.impl.SignalTree.Type;
26+
27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
import static org.junit.jupiter.api.Assertions.assertNotNull;
29+
import static org.junit.jupiter.api.Assertions.assertNull;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
31+
32+
public class LocalAsynchronousSignalTreeTest extends SignalTestBase {
33+
34+
@Test
35+
void type_isAsynchronous() {
36+
LocalAsynchronousSignalTree tree = new LocalAsynchronousSignalTree();
37+
assertEquals(Type.ASYNCHRONOUS, tree.type());
38+
}
39+
40+
@Test
41+
void submit_singleCommand_immediatelyConfirmed() {
42+
LocalAsynchronousSignalTree tree = new LocalAsynchronousSignalTree();
43+
AtomicReference<CommandResult> result = new AtomicReference<>();
44+
45+
SignalCommand command = TestUtil.writeRootValueCommand();
46+
tree.commitSingleCommand(command, result::set);
47+
48+
assertTrue(result.get().accepted());
49+
}
50+
51+
@Test
52+
void submitted_afterCommand_immediatelyUpdated() {
53+
LocalAsynchronousSignalTree tree = new LocalAsynchronousSignalTree();
54+
55+
tree.commitSingleCommand(TestUtil.writeRootValueCommand());
56+
57+
assertNotNull(TestUtil.readSubmittedRootValue(tree));
58+
}
59+
60+
@Test
61+
void confirmed_afterCommand_immediatelyUpdated() {
62+
LocalAsynchronousSignalTree tree = new LocalAsynchronousSignalTree();
63+
64+
tree.commitSingleCommand(TestUtil.writeRootValueCommand());
65+
66+
assertNotNull(TestUtil.readConfirmedRootValue(tree));
67+
}
68+
69+
@Test
70+
void multipleCommands_allImmediatelyConfirmed() {
71+
LocalAsynchronousSignalTree tree = new LocalAsynchronousSignalTree();
72+
73+
for (int i = 0; i < 3; i++) {
74+
AtomicReference<CommandResult> result = new AtomicReference<>();
75+
tree.commitSingleCommand(
76+
TestUtil.writeRootValueCommand("value" + i), result::set);
77+
assertTrue(result.get().accepted());
78+
}
79+
}
80+
81+
@Test
82+
void submit_withAsyncDispatcher_confirmedOnlyAfterDispatch() {
83+
TestExecutor dispatcher = useTestEffectDispatcher();
84+
LocalAsynchronousSignalTree tree = new LocalAsynchronousSignalTree();
85+
86+
tree.commitSingleCommand(TestUtil.writeRootValueCommand());
87+
88+
assertNotNull(TestUtil.readSubmittedRootValue(tree));
89+
assertNull(TestUtil.readConfirmedRootValue(tree));
90+
91+
dispatcher.runPendingTasks();
92+
93+
assertNotNull(TestUtil.readConfirmedRootValue(tree));
94+
}
95+
}

0 commit comments

Comments
 (0)