Skip to content

Commit e3b425c

Browse files
fix: prevent deadlock on concurrent push and disconnect (#24215) (CP: 25.1) (#24227)
This PR cherry-picks changes from the original PR #24215 to branch 25.1. --- #### Original PR description > The previous `AtomicBoolean` guard in `AtmospherePushConnection` closed only the disconnect-vs-disconnect race. A push thread that reads `disconnecting` as false before a concurrent `disconnect()` flips it can still proceed into `synchronized(lock)` behind the disconnect thread, which is itself blocked inside `resource.close()` waiting for the servlet container's HTTP session lock held by the push thread — a two-lock cycle. > > Move `resource.close()` out of the monitor: inside `synchronized(lock)` capture the resource into a local and call `connectionLost()` to transition the state, then release the monitor before invoking `close()` on the stashed reference. Add a matching re-check of `isConnected()` at the top of the `synchronized` block in `push()` so a push that waited for the monitor observes the late disconnect and defers via `PUSH_PENDING`/`RESPONSE_PENDING` instead of NPEing on the cleared resource. The `disconnecting` flag stays set until `close()` returns so subsequent pushes take the fast path and no new `disconnect()` re-enters while `close()` is still in flight. > > Related-to #24192 Co-authored-by: Marco Collovati <marco@vaadin.com>
1 parent 1c8e929 commit e3b425c

2 files changed

Lines changed: 168 additions & 13 deletions

File tree

flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,20 @@ public void push(boolean async) {
203203
}
204204
} else {
205205
synchronized (lock) {
206+
// A concurrent disconnect() may have cleared the
207+
// resource and transitioned state to DISCONNECTED while
208+
// this thread was waiting to enter the monitor. In that
209+
// case treat the push as if disconnecting had been
210+
// observed above: defer it and skip sendMessage, which
211+
// would otherwise NPE on the null resource.
212+
if (!isConnected()) {
213+
if (async && state != State.RESPONSE_PENDING) {
214+
state = State.PUSH_PENDING;
215+
} else {
216+
state = State.RESPONSE_PENDING;
217+
}
218+
return;
219+
}
206220
try {
207221
JsonNode response = new UidlWriter().createUidl(getUI(),
208222
async);
@@ -337,15 +351,16 @@ public void disconnect() {
337351
return;
338352
}
339353

340-
synchronized (lock) {
341-
if (!isConnected() || resource == null) {
342-
// Already disconnected. Should not happen but if it does,
343-
// we don't want to cause NPEs
344-
getLogger().debug(
345-
"Disconnection already happened, ignoring request");
346-
return;
347-
}
348-
try {
354+
AtmosphereResource resourceToClose;
355+
try {
356+
synchronized (lock) {
357+
if (!isConnected() || resource == null) {
358+
// Already disconnected. Should not happen but if it does,
359+
// we don't want to cause NPEs
360+
getLogger().debug(
361+
"Disconnection already happened, ignoring request");
362+
return;
363+
}
349364
if (resource.isResumed()) {
350365
// This can happen for long polling because of
351366
// http://dev.vaadin.com/ticket/16919
@@ -374,15 +389,27 @@ public void disconnect() {
374389
}
375390
outgoingMessage = null;
376391
}
392+
// Capture the resource and transition this connection to
393+
// the disconnected state BEFORE releasing the monitor, so
394+
// concurrent push() callers observe an already-closed
395+
// connection and skip sendMessage. The actual
396+
// resource.close() call is performed outside of
397+
// synchronized(lock) below, to avoid a lock-ordering
398+
// deadlock between this monitor and the HTTP session
399+
// lock that the servlet container may acquire while
400+
// closing the AtmosphereResource.
401+
resourceToClose = resource;
402+
connectionLost();
403+
}
404+
if (resourceToClose != null) {
377405
try {
378-
resource.close();
406+
resourceToClose.close();
379407
} catch (IOException e) {
380408
getLogger().info("Error when closing push connection", e);
381409
}
382-
connectionLost();
383-
} finally {
384-
disconnecting.set(false);
385410
}
411+
} finally {
412+
disconnecting.set(false);
386413
}
387414
}
388415

flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.Executors;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.concurrent.locks.ReentrantLock;
3132

3233
import org.atmosphere.cpr.AtmosphereResource;
@@ -310,4 +311,131 @@ void pushWhileDisconnect_preventDeadlocks() throws Exception {
310311
Mockito.verify(resource, Mockito.times(1)).close();
311312
}
312313

314+
@Test
315+
void pushInterleavedWithDisconnect_preventDeadlocks() throws Exception {
316+
// Same motivation as pushWhileDisconnect_preventDeadlocks, but
317+
// exercises the race where push() has already read
318+
// disconnecting=false BEFORE a concurrent disconnect() flips it
319+
// to true. The AtomicBoolean guard does not protect against this
320+
// interleaving: disconnect() enters synchronized(lock) and
321+
// blocks in resource.close() waiting for the HTTP session lock
322+
// held by the push thread, while the push thread then blocks
323+
// trying to enter synchronized(lock) held by disconnect(),
324+
// producing a deadlock.
325+
ReentrantLock httpSessionLock = new ReentrantLock();
326+
CountDownLatch disconnectReachedClose = new CountDownLatch(1);
327+
Mockito.doAnswer(i -> {
328+
// Signal that disconnect() has entered synchronized(lock)
329+
// and is about to contend for the HTTP session lock.
330+
disconnectReachedClose.countDown();
331+
// simulate HTTP session lock attempt because resource.close
332+
// accesses session attributes; fail fast if it is still held
333+
// by the push thread (indicates a deadlock).
334+
if (httpSessionLock.tryLock(2, TimeUnit.SECONDS)) {
335+
httpSessionLock.unlock();
336+
} else {
337+
throw new AssertionError(
338+
"Deadlock on AtmosphereResource.close");
339+
}
340+
return null;
341+
}).when(resource).close();
342+
343+
CountDownLatch pushReachedBarrier = new CountDownLatch(1);
344+
CountDownLatch disconnectProceed = new CountDownLatch(1);
345+
AtomicBoolean paused = new AtomicBoolean(false);
346+
ThreadLocal<Boolean> pushThreadMarker = ThreadLocal
347+
.withInitial(() -> Boolean.FALSE);
348+
349+
// Pause push() between the disconnecting.get() read and the
350+
// synchronized(lock) entry by overriding isConnected(), which is
351+
// called in between. Only the push thread's first call pauses,
352+
// so a post-fix re-check inside synchronized(lock) does not
353+
// re-trigger the hook.
354+
UI ui = Mockito.spy(new UI());
355+
Mockito.when(ui.getSession()).thenReturn(vaadinSession);
356+
AtmospherePushConnection testConnection = new AtmospherePushConnection(
357+
ui) {
358+
@Override
359+
public boolean isConnected() {
360+
boolean connected = super.isConnected();
361+
if (connected && pushThreadMarker.get()
362+
&& paused.compareAndSet(false, true)) {
363+
pushReachedBarrier.countDown();
364+
try {
365+
disconnectProceed.await(2, TimeUnit.SECONDS);
366+
} catch (InterruptedException e) {
367+
Thread.currentThread().interrupt();
368+
}
369+
}
370+
return connected;
371+
}
372+
};
373+
testConnection.connect(resource);
374+
375+
// Dedicated executor to guarantee that push and disconnect can
376+
// run concurrently, independent of the common pool parallelism.
377+
ExecutorService executor = Executors.newFixedThreadPool(2);
378+
try {
379+
CompletableFuture<Throwable> pushFuture = CompletableFuture
380+
.supplyAsync(() -> {
381+
pushThreadMarker.set(Boolean.TRUE);
382+
httpSessionLock.lock();
383+
try {
384+
vaadinSession.runWithLock(() -> {
385+
testConnection.push();
386+
return null;
387+
});
388+
return (Throwable) null;
389+
} catch (Throwable t) {
390+
return t;
391+
} finally {
392+
httpSessionLock.unlock();
393+
pushThreadMarker.remove();
394+
}
395+
}, executor);
396+
397+
// Wait until push() has read disconnecting=false and is
398+
// paused just before entering synchronized(lock).
399+
assertTrue(pushReachedBarrier.await(2, TimeUnit.SECONDS),
400+
"Push thread did not reach the barrier");
401+
402+
// Start a concurrent disconnect(). It will CAS disconnecting
403+
// from false to true, enter synchronized(lock), and then
404+
// attempt resource.close(), which requires the HTTP session
405+
// lock held by the push thread.
406+
CompletableFuture<Throwable> disconnectFuture = CompletableFuture
407+
.supplyAsync(() -> {
408+
try {
409+
testConnection.disconnect();
410+
return (Throwable) null;
411+
} catch (Throwable t) {
412+
return t;
413+
}
414+
}, executor);
415+
416+
// Wait deterministically until disconnect() has reached
417+
// resource.close() before releasing the push thread.
418+
assertTrue(disconnectReachedClose.await(2, TimeUnit.SECONDS),
419+
"Disconnect did not reach resource.close()");
420+
421+
// Release the push thread so it proceeds toward
422+
// synchronized(lock). Without the fix, it blocks here
423+
// forever.
424+
disconnectProceed.countDown();
425+
426+
Throwable pushError = pushFuture.get(5, TimeUnit.SECONDS);
427+
Throwable disconnectError = disconnectFuture.get(5,
428+
TimeUnit.SECONDS);
429+
430+
if (disconnectError != null) {
431+
fail("Disconnect failed (likely deadlock): " + disconnectError);
432+
}
433+
if (pushError != null) {
434+
fail("Push failed: " + pushError);
435+
}
436+
} finally {
437+
executor.shutdownNow();
438+
}
439+
}
440+
313441
}

0 commit comments

Comments
 (0)