Skip to content

Commit 137826f

Browse files
authored
fix: prevent deadlock on concurrent push and disconnect (CP: 25.0) (#24216)
The previous `AtomicBoolean` guard in `AtmospherePushConnection` closed only the disconnect-vs-disconnect race. A push thread that reads `disconnecting` as false before a concurrent `disconnect()` flips it can still proceed into `synchronized(lock)` behind the disconnect thread, which is itself blocked inside `resource.close()` waiting for the servlet container's HTTP session lock held by the push thread — a two-lock cycle. Move `resource.close()` out of the monitor: inside `synchronized(lock)` capture the resource into a local and call `connectionLost()` to transition the state, then release the monitor before invoking `close()` on the stashed reference. Add a matching re-check of `isConnected()` at the top of the `synchronized` block in `push()` so a push that waited for the monitor observes the late disconnect and defers via `PUSH_PENDING`/`RESPONSE_PENDING` instead of NPEing on the cleared resource. The `disconnecting` flag stays set until `close()` returns so subsequent pushes take the fast path and no new `disconnect()` re-enters while `close()` is still in flight. Related-to #24192
1 parent 062eb66 commit 137826f

2 files changed

Lines changed: 170 additions & 13 deletions

File tree

flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,20 @@ public void push(boolean async) {
203203
}
204204
} else {
205205
synchronized (lock) {
206+
// A concurrent disconnect() may have cleared the
207+
// resource and transitioned state to DISCONNECTED while
208+
// this thread was waiting to enter the monitor. In that
209+
// case treat the push as if disconnecting had been
210+
// observed above: defer it and skip sendMessage, which
211+
// would otherwise NPE on the null resource.
212+
if (!isConnected()) {
213+
if (async && state != State.RESPONSE_PENDING) {
214+
state = State.PUSH_PENDING;
215+
} else {
216+
state = State.RESPONSE_PENDING;
217+
}
218+
return;
219+
}
206220
try {
207221
JsonNode response = new UidlWriter().createUidl(getUI(),
208222
async);
@@ -337,15 +351,16 @@ public void disconnect() {
337351
return;
338352
}
339353

340-
synchronized (lock) {
341-
if (!isConnected() || resource == null) {
342-
// Already disconnected. Should not happen but if it does,
343-
// we don't want to cause NPEs
344-
getLogger().debug(
345-
"Disconnection already happened, ignoring request");
346-
return;
347-
}
348-
try {
354+
AtmosphereResource resourceToClose;
355+
try {
356+
synchronized (lock) {
357+
if (!isConnected() || resource == null) {
358+
// Already disconnected. Should not happen but if it does,
359+
// we don't want to cause NPEs
360+
getLogger().debug(
361+
"Disconnection already happened, ignoring request");
362+
return;
363+
}
349364
if (resource.isResumed()) {
350365
// This can happen for long polling because of
351366
// http://dev.vaadin.com/ticket/16919
@@ -374,15 +389,27 @@ public void disconnect() {
374389
}
375390
outgoingMessage = null;
376391
}
392+
// Capture the resource and transition this connection to
393+
// the disconnected state BEFORE releasing the monitor, so
394+
// concurrent push() callers observe an already-closed
395+
// connection and skip sendMessage. The actual
396+
// resource.close() call is performed outside of
397+
// synchronized(lock) below, to avoid a lock-ordering
398+
// deadlock between this monitor and the HTTP session
399+
// lock that the servlet container may acquire while
400+
// closing the AtmosphereResource.
401+
resourceToClose = resource;
402+
connectionLost();
403+
}
404+
if (resourceToClose != null) {
377405
try {
378-
resource.close();
406+
resourceToClose.close();
379407
} catch (IOException e) {
380408
getLogger().info("Error when closing push connection", e);
381409
}
382-
connectionLost();
383-
} finally {
384-
disconnecting.set(false);
385410
}
411+
} finally {
412+
disconnecting.set(false);
386413
}
387414
}
388415

flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.Executors;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.concurrent.locks.ReentrantLock;
3132

3233
import org.atmosphere.cpr.AtmosphereResource;
@@ -308,4 +309,133 @@ public void pushWhileDisconnect_preventDeadlocks() throws Exception {
308309
Mockito.verify(resource, Mockito.times(1)).close();
309310
}
310311

312+
@Test
313+
public void pushInterleavedWithDisconnect_preventDeadlocks()
314+
throws Exception {
315+
// Same motivation as pushWhileDisconnect_preventDeadlocks, but
316+
// exercises the race where push() has already read
317+
// disconnecting=false BEFORE a concurrent disconnect() flips it
318+
// to true. The AtomicBoolean guard does not protect against this
319+
// interleaving: disconnect() enters synchronized(lock) and
320+
// blocks in resource.close() waiting for the HTTP session lock
321+
// held by the push thread, while the push thread then blocks
322+
// trying to enter synchronized(lock) held by disconnect(),
323+
// producing a deadlock.
324+
ReentrantLock httpSessionLock = new ReentrantLock();
325+
CountDownLatch disconnectReachedClose = new CountDownLatch(1);
326+
Mockito.doAnswer(i -> {
327+
// Signal that disconnect() has entered synchronized(lock)
328+
// and is about to contend for the HTTP session lock.
329+
disconnectReachedClose.countDown();
330+
// simulate HTTP session lock attempt because resource.close
331+
// accesses session attributes; fail fast if it is still held
332+
// by the push thread (indicates a deadlock).
333+
if (httpSessionLock.tryLock(2, TimeUnit.SECONDS)) {
334+
httpSessionLock.unlock();
335+
} else {
336+
throw new AssertionError(
337+
"Deadlock on AtmosphereResource.close");
338+
}
339+
return null;
340+
}).when(resource).close();
341+
342+
CountDownLatch pushReachedBarrier = new CountDownLatch(1);
343+
CountDownLatch disconnectProceed = new CountDownLatch(1);
344+
AtomicBoolean paused = new AtomicBoolean(false);
345+
ThreadLocal<Boolean> pushThreadMarker = ThreadLocal
346+
.withInitial(() -> Boolean.FALSE);
347+
348+
// Pause push() between the disconnecting.get() read and the
349+
// synchronized(lock) entry by overriding isConnected(), which is
350+
// called in between. Only the push thread's first call pauses,
351+
// so a post-fix re-check inside synchronized(lock) does not
352+
// re-trigger the hook.
353+
UI ui = Mockito.spy(new UI());
354+
Mockito.when(ui.getSession()).thenReturn(vaadinSession);
355+
AtmospherePushConnection testConnection = new AtmospherePushConnection(
356+
ui) {
357+
@Override
358+
public boolean isConnected() {
359+
boolean connected = super.isConnected();
360+
if (connected && pushThreadMarker.get()
361+
&& paused.compareAndSet(false, true)) {
362+
pushReachedBarrier.countDown();
363+
try {
364+
disconnectProceed.await(2, TimeUnit.SECONDS);
365+
} catch (InterruptedException e) {
366+
Thread.currentThread().interrupt();
367+
}
368+
}
369+
return connected;
370+
}
371+
};
372+
testConnection.connect(resource);
373+
374+
// Dedicated executor to guarantee that push and disconnect can
375+
// run concurrently, independent of the common pool parallelism.
376+
ExecutorService executor = Executors.newFixedThreadPool(2);
377+
try {
378+
CompletableFuture<Throwable> pushFuture = CompletableFuture
379+
.supplyAsync(() -> {
380+
pushThreadMarker.set(Boolean.TRUE);
381+
httpSessionLock.lock();
382+
try {
383+
vaadinSession.runWithLock(() -> {
384+
testConnection.push();
385+
return null;
386+
});
387+
return (Throwable) null;
388+
} catch (Throwable t) {
389+
return t;
390+
} finally {
391+
httpSessionLock.unlock();
392+
pushThreadMarker.remove();
393+
}
394+
}, executor);
395+
396+
// Wait until push() has read disconnecting=false and is
397+
// paused just before entering synchronized(lock).
398+
Assert.assertTrue("Push thread did not reach the barrier",
399+
pushReachedBarrier.await(2, TimeUnit.SECONDS));
400+
401+
// Start a concurrent disconnect(). It will CAS disconnecting
402+
// from false to true, enter synchronized(lock), and then
403+
// attempt resource.close(), which requires the HTTP session
404+
// lock held by the push thread.
405+
CompletableFuture<Throwable> disconnectFuture = CompletableFuture
406+
.supplyAsync(() -> {
407+
try {
408+
testConnection.disconnect();
409+
return (Throwable) null;
410+
} catch (Throwable t) {
411+
return t;
412+
}
413+
}, executor);
414+
415+
// Wait deterministically until disconnect() has reached
416+
// resource.close() before releasing the push thread.
417+
Assert.assertTrue("Disconnect did not reach resource.close()",
418+
disconnectReachedClose.await(2, TimeUnit.SECONDS));
419+
420+
// Release the push thread so it proceeds toward
421+
// synchronized(lock). Without the fix, it blocks here
422+
// forever.
423+
disconnectProceed.countDown();
424+
425+
Throwable pushError = pushFuture.get(5, TimeUnit.SECONDS);
426+
Throwable disconnectError = disconnectFuture.get(5,
427+
TimeUnit.SECONDS);
428+
429+
if (disconnectError != null) {
430+
Assert.fail("Disconnect failed (likely deadlock): "
431+
+ disconnectError);
432+
}
433+
if (pushError != null) {
434+
Assert.fail("Push failed: " + pushError);
435+
}
436+
} finally {
437+
executor.shutdownNow();
438+
}
439+
}
440+
311441
}

0 commit comments

Comments
 (0)