Skip to content

Commit 649c734

Browse files
fix: prevent deadlock on concurrent push and disconnect (CP: 25.0) (#24216) (CP: 24.9) (#24226)
This PR cherry-picks changes from the original PR #24216 to branch 24.9. --- #### Original PR description > The previous `AtomicBoolean` guard in `AtmospherePushConnection` closed only the disconnect-vs-disconnect race. A push thread that reads `disconnecting` as false before a concurrent `disconnect()` flips it can still proceed into `synchronized(lock)` behind the disconnect thread, which is itself blocked inside `resource.close()` waiting for the servlet container's HTTP session lock held by the push thread — a two-lock cycle. > > Move `resource.close()` out of the monitor: inside `synchronized(lock)` capture the resource into a local and call `connectionLost()` to transition the state, then release the monitor before invoking `close()` on the stashed reference. Add a matching re-check of `isConnected()` at the top of the `synchronized` block in `push()` so a push that waited for the monitor observes the late disconnect and defers via `PUSH_PENDING`/`RESPONSE_PENDING` instead of NPEing on the cleared resource. The `disconnecting` flag stays set until `close()` returns so subsequent pushes take the fast path and no new `disconnect()` re-enters while `close()` is still in flight. > > Related-to #24192 > --------- Co-authored-by: Marco Collovati <marco@vaadin.com>
1 parent 59bff9a commit 649c734

2 files changed

Lines changed: 172 additions & 13 deletions

File tree

flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,20 @@ public void push(boolean async) {
204204
}
205205
} else {
206206
synchronized (lock) {
207+
// A concurrent disconnect() may have cleared the
208+
// resource and transitioned state to DISCONNECTED while
209+
// this thread was waiting to enter the monitor. In that
210+
// case treat the push as if disconnecting had been
211+
// observed above: defer it and skip sendMessage, which
212+
// would otherwise NPE on the null resource.
213+
if (!isConnected()) {
214+
if (async && state != State.RESPONSE_PENDING) {
215+
state = State.PUSH_PENDING;
216+
} else {
217+
state = State.RESPONSE_PENDING;
218+
}
219+
return;
220+
}
207221
try {
208222
JsonNode response = new UidlWriter().createUidl(getUI(),
209223
async);
@@ -336,15 +350,16 @@ public void disconnect() {
336350
return;
337351
}
338352

339-
synchronized (lock) {
340-
if (!isConnected() || resource == null) {
341-
// Already disconnected. Should not happen but if it does,
342-
// we don't want to cause NPEs
343-
getLogger().debug(
344-
"Disconnection already happened, ignoring request");
345-
return;
346-
}
347-
try {
353+
AtmosphereResource resourceToClose;
354+
try {
355+
synchronized (lock) {
356+
if (!isConnected() || resource == null) {
357+
// Already disconnected. Should not happen but if it does,
358+
// we don't want to cause NPEs
359+
getLogger().debug(
360+
"Disconnection already happened, ignoring request");
361+
return;
362+
}
348363
if (resource.isResumed()) {
349364
// This can happen for long polling because of
350365
// http://dev.vaadin.com/ticket/16919
@@ -373,15 +388,27 @@ public void disconnect() {
373388
}
374389
outgoingMessage = null;
375390
}
391+
// Capture the resource and transition this connection to
392+
// the disconnected state BEFORE releasing the monitor, so
393+
// concurrent push() callers observe an already-closed
394+
// connection and skip sendMessage. The actual
395+
// resource.close() call is performed outside of
396+
// synchronized(lock) below, to avoid a lock-ordering
397+
// deadlock between this monitor and the HTTP session
398+
// lock that the servlet container may acquire while
399+
// closing the AtmosphereResource.
400+
resourceToClose = resource;
401+
connectionLost();
402+
}
403+
if (resourceToClose != null) {
376404
try {
377-
resource.close();
405+
resourceToClose.close();
378406
} catch (IOException e) {
379407
getLogger().info("Error when closing push connection", e);
380408
}
381-
connectionLost();
382-
} finally {
383-
disconnecting.set(false);
384409
}
410+
} finally {
411+
disconnecting.set(false);
385412
}
386413
}
387414

flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.CompletionException;
2626
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Executors;
2729
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicBoolean;
2831
import java.util.concurrent.locks.ReentrantLock;
2932

3033
import org.atmosphere.cpr.AtmosphereResource;
@@ -292,4 +295,133 @@ public void pushWhileDisconnect_preventDeadlocks() throws Exception {
292295
Mockito.verify(resource, Mockito.times(1)).close();
293296
}
294297

298+
@Test
299+
public void pushInterleavedWithDisconnect_preventDeadlocks()
300+
throws Exception {
301+
// Same motivation as pushWhileDisconnect_preventDeadlocks, but
302+
// exercises the race where push() has already read
303+
// disconnecting=false BEFORE a concurrent disconnect() flips it
304+
// to true. The AtomicBoolean guard does not protect against this
305+
// interleaving: disconnect() enters synchronized(lock) and
306+
// blocks in resource.close() waiting for the HTTP session lock
307+
// held by the push thread, while the push thread then blocks
308+
// trying to enter synchronized(lock) held by disconnect(),
309+
// producing a deadlock.
310+
ReentrantLock httpSessionLock = new ReentrantLock();
311+
CountDownLatch disconnectReachedClose = new CountDownLatch(1);
312+
Mockito.doAnswer(i -> {
313+
// Signal that disconnect() has entered synchronized(lock)
314+
// and is about to contend for the HTTP session lock.
315+
disconnectReachedClose.countDown();
316+
// simulate HTTP session lock attempt because resource.close
317+
// accesses session attributes; fail fast if it is still held
318+
// by the push thread (indicates a deadlock).
319+
if (httpSessionLock.tryLock(2, TimeUnit.SECONDS)) {
320+
httpSessionLock.unlock();
321+
} else {
322+
throw new AssertionError(
323+
"Deadlock on AtmosphereResource.close");
324+
}
325+
return null;
326+
}).when(resource).close();
327+
328+
CountDownLatch pushReachedBarrier = new CountDownLatch(1);
329+
CountDownLatch disconnectProceed = new CountDownLatch(1);
330+
AtomicBoolean paused = new AtomicBoolean(false);
331+
ThreadLocal<Boolean> pushThreadMarker = ThreadLocal
332+
.withInitial(() -> Boolean.FALSE);
333+
334+
// Pause push() between the disconnecting.get() read and the
335+
// synchronized(lock) entry by overriding isConnected(), which is
336+
// called in between. Only the push thread's first call pauses,
337+
// so a post-fix re-check inside synchronized(lock) does not
338+
// re-trigger the hook.
339+
UI ui = Mockito.spy(new UI());
340+
Mockito.when(ui.getSession()).thenReturn(vaadinSession);
341+
AtmospherePushConnection testConnection = new AtmospherePushConnection(
342+
ui) {
343+
@Override
344+
public boolean isConnected() {
345+
boolean connected = super.isConnected();
346+
if (connected && pushThreadMarker.get()
347+
&& paused.compareAndSet(false, true)) {
348+
pushReachedBarrier.countDown();
349+
try {
350+
disconnectProceed.await(2, TimeUnit.SECONDS);
351+
} catch (InterruptedException e) {
352+
Thread.currentThread().interrupt();
353+
}
354+
}
355+
return connected;
356+
}
357+
};
358+
testConnection.connect(resource);
359+
360+
// Dedicated executor to guarantee that push and disconnect can
361+
// run concurrently, independent of the common pool parallelism.
362+
ExecutorService executor = Executors.newFixedThreadPool(2);
363+
try {
364+
CompletableFuture<Throwable> pushFuture = CompletableFuture
365+
.supplyAsync(() -> {
366+
pushThreadMarker.set(Boolean.TRUE);
367+
httpSessionLock.lock();
368+
try {
369+
vaadinSession.runWithLock(() -> {
370+
testConnection.push();
371+
return null;
372+
});
373+
return (Throwable) null;
374+
} catch (Throwable t) {
375+
return t;
376+
} finally {
377+
httpSessionLock.unlock();
378+
pushThreadMarker.remove();
379+
}
380+
}, executor);
381+
382+
// Wait until push() has read disconnecting=false and is
383+
// paused just before entering synchronized(lock).
384+
Assert.assertTrue("Push thread did not reach the barrier",
385+
pushReachedBarrier.await(2, TimeUnit.SECONDS));
386+
387+
// Start a concurrent disconnect(). It will CAS disconnecting
388+
// from false to true, enter synchronized(lock), and then
389+
// attempt resource.close(), which requires the HTTP session
390+
// lock held by the push thread.
391+
CompletableFuture<Throwable> disconnectFuture = CompletableFuture
392+
.supplyAsync(() -> {
393+
try {
394+
testConnection.disconnect();
395+
return (Throwable) null;
396+
} catch (Throwable t) {
397+
return t;
398+
}
399+
}, executor);
400+
401+
// Wait deterministically until disconnect() has reached
402+
// resource.close() before releasing the push thread.
403+
Assert.assertTrue("Disconnect did not reach resource.close()",
404+
disconnectReachedClose.await(2, TimeUnit.SECONDS));
405+
406+
// Release the push thread so it proceeds toward
407+
// synchronized(lock). Without the fix, it blocks here
408+
// forever.
409+
disconnectProceed.countDown();
410+
411+
Throwable pushError = pushFuture.get(5, TimeUnit.SECONDS);
412+
Throwable disconnectError = disconnectFuture.get(5,
413+
TimeUnit.SECONDS);
414+
415+
if (disconnectError != null) {
416+
Assert.fail("Disconnect failed (likely deadlock): "
417+
+ disconnectError);
418+
}
419+
if (pushError != null) {
420+
Assert.fail("Push failed: " + pushError);
421+
}
422+
} finally {
423+
executor.shutdownNow();
424+
}
425+
}
426+
295427
}

0 commit comments

Comments
 (0)