Skip to content

Commit 87526fc

Browse files
authored
fix: prevent deadlock on concurrent push and disconnect (2.13) (#24214)
The previous `AtomicBoolean` guard in `AtmospherePushConnection` closed only the disconnect-vs-disconnect race. A push thread that reads `disconnecting` as false before a concurrent `disconnect()` flips it can still proceed into `synchronized(lock)` behind the disconnect thread, which is itself blocked inside `resource.close()` waiting for the servlet container's HTTP session lock held by the push thread — a two-lock cycle. Move `resource.close()` out of the monitor: inside `synchronized(lock)` capture the resource into a local and call `connectionLost()` to transition the state, then release the monitor before invoking `close()` on the stashed reference. Add a matching re-check of `isConnected()` at the top of the `synchronized` block in `push()` so a push that waited for the monitor observes the late disconnect and defers via `PUSH_PENDING`/`RESPONSE_PENDING` instead of NPEing on the cleared resource. The `disconnecting` flag stays set until `close()` returns so subsequent pushes take the fast path and no new `disconnect()` re-enters while `close()` is still in flight. Related-to #24192
1 parent 48def21 commit 87526fc

2 files changed

Lines changed: 172 additions & 13 deletions

File tree

flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,20 @@ public void push(boolean async) {
195195
}
196196
} else {
197197
synchronized (lock) {
198+
// A concurrent disconnect() may have cleared the
199+
// resource and transitioned state to DISCONNECTED while
200+
// this thread was waiting to enter the monitor. In that
201+
// case treat the push as if disconnecting had been
202+
// observed above: defer it and skip sendMessage, which
203+
// would otherwise NPE on the null resource.
204+
if (!isConnected()) {
205+
if (async && state != State.RESPONSE_PENDING) {
206+
state = State.PUSH_PENDING;
207+
} else {
208+
state = State.RESPONSE_PENDING;
209+
}
210+
return;
211+
}
198212
try {
199213
JsonObject response = new UidlWriter().createUidl(getUI(),
200214
async);
@@ -327,15 +341,16 @@ public void disconnect() {
327341
return;
328342
}
329343

330-
synchronized (lock) {
331-
if (!isConnected() || resource == null) {
332-
// Already disconnected. Should not happen but if it does,
333-
// we don't want to cause NPEs
334-
getLogger().debug(
335-
"Disconnection already happened, ignoring request");
336-
return;
337-
}
338-
try {
344+
AtmosphereResource resourceToClose;
345+
try {
346+
synchronized (lock) {
347+
if (!isConnected() || resource == null) {
348+
// Already disconnected. Should not happen but if it does,
349+
// we don't want to cause NPEs
350+
getLogger().debug(
351+
"Disconnection already happened, ignoring request");
352+
return;
353+
}
339354
if (resource.isResumed()) {
340355
// This can happen for long polling because of
341356
// http://dev.vaadin.com/ticket/16919
@@ -359,15 +374,27 @@ public void disconnect() {
359374
}
360375
outgoingMessage = null;
361376
}
377+
// Capture the resource and transition this connection to
378+
// the disconnected state BEFORE releasing the monitor, so
379+
// concurrent push() callers observe an already-closed
380+
// connection and skip sendMessage. The actual
381+
// resource.close() call is performed outside of
382+
// synchronized(lock) below, to avoid a lock-ordering
383+
// deadlock between this monitor and the HTTP session
384+
// lock that the servlet container may acquire while
385+
// closing the AtmosphereResource.
386+
resourceToClose = resource;
387+
connectionLost();
388+
}
389+
if (resourceToClose != null) {
362390
try {
363-
resource.close();
391+
resourceToClose.close();
364392
} catch (IOException e) {
365393
getLogger().info("Error when closing push connection", e);
366394
}
367-
connectionLost();
368-
} finally {
369-
disconnecting.set(false);
370395
}
396+
} finally {
397+
disconnecting.set(false);
371398
}
372399
}
373400

flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
import java.util.concurrent.CompletableFuture;
1616
import java.util.concurrent.CompletionException;
1717
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
1820
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.concurrent.locks.ReentrantLock;
2023

2124
import org.atmosphere.cpr.AtmosphereResource;
@@ -286,4 +289,133 @@ public void pushWhileDisconnect_preventDeadlocks() throws Exception {
286289
Mockito.verify(resource, Mockito.times(1)).close();
287290
}
288291

292+
@Test
293+
public void pushInterleavedWithDisconnect_preventDeadlocks()
294+
throws Exception {
295+
// Same motivation as pushWhileDisconnect_preventDeadlocks, but
296+
// exercises the race where push() has already read
297+
// disconnecting=false BEFORE a concurrent disconnect() flips it
298+
// to true. The AtomicBoolean guard does not protect against this
299+
// interleaving: disconnect() enters synchronized(lock) and
300+
// blocks in resource.close() waiting for the HTTP session lock
301+
// held by the push thread, while the push thread then blocks
302+
// trying to enter synchronized(lock) held by disconnect(),
303+
// producing a deadlock.
304+
ReentrantLock httpSessionLock = new ReentrantLock();
305+
CountDownLatch disconnectReachedClose = new CountDownLatch(1);
306+
Mockito.doAnswer(i -> {
307+
// Signal that disconnect() has entered synchronized(lock)
308+
// and is about to contend for the HTTP session lock.
309+
disconnectReachedClose.countDown();
310+
// simulate HTTP session lock attempt because resource.close
311+
// accesses session attributes; fail fast if it is still held
312+
// by the push thread (indicates a deadlock).
313+
if (httpSessionLock.tryLock(2, TimeUnit.SECONDS)) {
314+
httpSessionLock.unlock();
315+
} else {
316+
throw new AssertionError(
317+
"Deadlock on AtmosphereResource.close");
318+
}
319+
return null;
320+
}).when(resource).close();
321+
322+
CountDownLatch pushReachedBarrier = new CountDownLatch(1);
323+
CountDownLatch disconnectProceed = new CountDownLatch(1);
324+
AtomicBoolean paused = new AtomicBoolean(false);
325+
ThreadLocal<Boolean> pushThreadMarker = ThreadLocal
326+
.withInitial(() -> Boolean.FALSE);
327+
328+
// Pause push() between the disconnecting.get() read and the
329+
// synchronized(lock) entry by overriding isConnected(), which is
330+
// called in between. Only the push thread's first call pauses,
331+
// so a post-fix re-check inside synchronized(lock) does not
332+
// re-trigger the hook.
333+
UI ui = Mockito.spy(new UI());
334+
Mockito.when(ui.getSession()).thenReturn(vaadinSession);
335+
AtmospherePushConnection testConnection = new AtmospherePushConnection(
336+
ui) {
337+
@Override
338+
public boolean isConnected() {
339+
boolean connected = super.isConnected();
340+
if (connected && pushThreadMarker.get()
341+
&& paused.compareAndSet(false, true)) {
342+
pushReachedBarrier.countDown();
343+
try {
344+
disconnectProceed.await(2, TimeUnit.SECONDS);
345+
} catch (InterruptedException e) {
346+
Thread.currentThread().interrupt();
347+
}
348+
}
349+
return connected;
350+
}
351+
};
352+
testConnection.connect(resource);
353+
354+
// Dedicated executor to guarantee that push and disconnect can
355+
// run concurrently, independent of the common pool parallelism.
356+
ExecutorService executor = Executors.newFixedThreadPool(2);
357+
try {
358+
CompletableFuture<Throwable> pushFuture = CompletableFuture
359+
.supplyAsync(() -> {
360+
pushThreadMarker.set(Boolean.TRUE);
361+
httpSessionLock.lock();
362+
try {
363+
vaadinSession.runWithLock(() -> {
364+
testConnection.push();
365+
return null;
366+
});
367+
return (Throwable) null;
368+
} catch (Throwable t) {
369+
return t;
370+
} finally {
371+
httpSessionLock.unlock();
372+
pushThreadMarker.remove();
373+
}
374+
}, executor);
375+
376+
// Wait until push() has read disconnecting=false and is
377+
// paused just before entering synchronized(lock).
378+
Assert.assertTrue("Push thread did not reach the barrier",
379+
pushReachedBarrier.await(2, TimeUnit.SECONDS));
380+
381+
// Start a concurrent disconnect(). It will CAS disconnecting
382+
// from false to true, enter synchronized(lock), and then
383+
// attempt resource.close(), which requires the HTTP session
384+
// lock held by the push thread.
385+
CompletableFuture<Throwable> disconnectFuture = CompletableFuture
386+
.supplyAsync(() -> {
387+
try {
388+
testConnection.disconnect();
389+
return (Throwable) null;
390+
} catch (Throwable t) {
391+
return t;
392+
}
393+
}, executor);
394+
395+
// Wait deterministically until disconnect() has reached
396+
// resource.close() before releasing the push thread.
397+
Assert.assertTrue("Disconnect did not reach resource.close()",
398+
disconnectReachedClose.await(2, TimeUnit.SECONDS));
399+
400+
// Release the push thread so it proceeds toward
401+
// synchronized(lock). Without the fix, it blocks here
402+
// forever.
403+
disconnectProceed.countDown();
404+
405+
Throwable pushError = pushFuture.get(5, TimeUnit.SECONDS);
406+
Throwable disconnectError = disconnectFuture.get(5,
407+
TimeUnit.SECONDS);
408+
409+
if (disconnectError != null) {
410+
Assert.fail("Disconnect failed (likely deadlock): "
411+
+ disconnectError);
412+
}
413+
if (pushError != null) {
414+
Assert.fail("Push failed: " + pushError);
415+
}
416+
} finally {
417+
executor.shutdownNow();
418+
}
419+
}
420+
289421
}

0 commit comments

Comments
 (0)