Skip to content

Commit 7115588

Browse files
fix: prevent deadlock on concurrent push and disconnect (CP: 23.7) (#24229) (CP: 23.6) (#24236)
This PR cherry-picks changes from the original PR #24229 to branch 23.6. --- #### Original PR description > The previous `AtomicBoolean` guard in `AtmospherePushConnection` closed only the disconnect-vs-disconnect race. A push thread that reads `disconnecting` as false before a concurrent `disconnect()` flips it can still proceed into `synchronized(lock)` behind the disconnect thread, which is itself blocked inside `resource.close()` waiting for the servlet container's HTTP session lock held by the push thread — a two-lock cycle. > > Move `resource.close()` out of the monitor: inside `synchronized(lock)` capture the resource into a local and call `connectionLost()` to transition the state, then release the monitor before invoking `close()` on the stashed reference. Add a matching re-check of `isConnected()` at the top of the `synchronized` block in `push()` so a push that waited for the monitor observes the late disconnect and defers via `PUSH_PENDING`/`RESPONSE_PENDING` instead of NPEing on the cleared resource. The `disconnecting` flag stays set until `close()` returns so subsequent pushes take the fast path and no new `disconnect()` re-enters while `close()` is still in flight. > > Related-to #24192 Co-authored-by: Marco Collovati <marco@vaadin.com>
1 parent 986b789 commit 7115588

2 files changed

Lines changed: 172 additions & 15 deletions

File tree

flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,20 @@ public void push(boolean async) {
194194
}
195195
} else {
196196
synchronized (lock) {
197+
// A concurrent disconnect() may have cleared the
198+
// resource and transitioned state to DISCONNECTED while
199+
// this thread was waiting to enter the monitor. In that
200+
// case treat the push as if disconnecting had been
201+
// observed above: defer it and skip sendMessage, which
202+
// would otherwise NPE on the null resource.
203+
if (!isConnected()) {
204+
if (async && state != State.RESPONSE_PENDING) {
205+
state = State.PUSH_PENDING;
206+
} else {
207+
state = State.RESPONSE_PENDING;
208+
}
209+
return;
210+
}
197211
try {
198212
JsonObject response = new UidlWriter().createUidl(getUI(),
199213
async);
@@ -326,15 +340,16 @@ public void disconnect() {
326340
return;
327341
}
328342

329-
synchronized (lock) {
330-
if (!isConnected() || resource == null) {
331-
// Already disconnected. Should not happen but if it does,
332-
// we don't want to cause NPEs
333-
getLogger().debug(
334-
"Disconnection already happened, ignoring request");
335-
return;
336-
}
337-
try {
343+
AtmosphereResource resourceToClose;
344+
try {
345+
synchronized (lock) {
346+
if (!isConnected() || resource == null) {
347+
// Already disconnected. Should not happen but if it does,
348+
// we don't want to cause NPEs
349+
getLogger().debug(
350+
"Disconnection already happened, ignoring request");
351+
return;
352+
}
338353
if (resource.isResumed()) {
339354
// This can happen for long polling because of
340355
// http://dev.vaadin.com/ticket/16919
@@ -363,15 +378,27 @@ public void disconnect() {
363378
}
364379
outgoingMessage = null;
365380
}
381+
// Capture the resource and transition this connection to
382+
// the disconnected state BEFORE releasing the monitor, so
383+
// concurrent push() callers observe an already-closed
384+
// connection and skip sendMessage. The actual
385+
// resource.close() call is performed outside of
386+
// synchronized(lock) below, to avoid a lock-ordering
387+
// deadlock between this monitor and the HTTP session
388+
// lock that the servlet container may acquire while
389+
// closing the AtmosphereResource.
390+
resourceToClose = resource;
391+
connectionLost();
392+
}
393+
if (resourceToClose != null) {
366394
try {
367-
resource.close();
395+
resourceToClose.close();
368396
} catch (IOException e) {
369397
getLogger().info("Error when closing push connection", e);
370398
}
371-
connectionLost();
372-
} finally {
373-
disconnecting.set(false);
374399
}
400+
} finally {
401+
disconnecting.set(false);
375402
}
376403
}
377404

flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
import java.util.concurrent.CompletableFuture;
1616
import java.util.concurrent.CompletionException;
1717
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
1820
import java.util.concurrent.TimeUnit;
19-
20-
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.concurrent.locks.ReentrantLock;
2223

2324
import org.atmosphere.cpr.AtmosphereResource;
@@ -282,4 +283,133 @@ public void pushWhileDisconnect_preventDeadlocks() throws Exception {
282283
Mockito.verify(resource, Mockito.times(1)).close();
283284
}
284285

286+
@Test
287+
public void pushInterleavedWithDisconnect_preventDeadlocks()
288+
throws Exception {
289+
// Same motivation as pushWhileDisconnect_preventDeadlocks, but
290+
// exercises the race where push() has already read
291+
// disconnecting=false BEFORE a concurrent disconnect() flips it
292+
// to true. The AtomicBoolean guard does not protect against this
293+
// interleaving: disconnect() enters synchronized(lock) and
294+
// blocks in resource.close() waiting for the HTTP session lock
295+
// held by the push thread, while the push thread then blocks
296+
// trying to enter synchronized(lock) held by disconnect(),
297+
// producing a deadlock.
298+
ReentrantLock httpSessionLock = new ReentrantLock();
299+
CountDownLatch disconnectReachedClose = new CountDownLatch(1);
300+
Mockito.doAnswer(i -> {
301+
// Signal that disconnect() has entered synchronized(lock)
302+
// and is about to contend for the HTTP session lock.
303+
disconnectReachedClose.countDown();
304+
// simulate HTTP session lock attempt because resource.close
305+
// accesses session attributes; fail fast if it is still held
306+
// by the push thread (indicates a deadlock).
307+
if (httpSessionLock.tryLock(2, TimeUnit.SECONDS)) {
308+
httpSessionLock.unlock();
309+
} else {
310+
throw new AssertionError(
311+
"Deadlock on AtmosphereResource.close");
312+
}
313+
return null;
314+
}).when(resource).close();
315+
316+
CountDownLatch pushReachedBarrier = new CountDownLatch(1);
317+
CountDownLatch disconnectProceed = new CountDownLatch(1);
318+
AtomicBoolean paused = new AtomicBoolean(false);
319+
ThreadLocal<Boolean> pushThreadMarker = ThreadLocal
320+
.withInitial(() -> Boolean.FALSE);
321+
322+
// Pause push() between the disconnecting.get() read and the
323+
// synchronized(lock) entry by overriding isConnected(), which is
324+
// called in between. Only the push thread's first call pauses,
325+
// so a post-fix re-check inside synchronized(lock) does not
326+
// re-trigger the hook.
327+
UI ui = Mockito.spy(new UI());
328+
Mockito.when(ui.getSession()).thenReturn(vaadinSession);
329+
AtmospherePushConnection testConnection = new AtmospherePushConnection(
330+
ui) {
331+
@Override
332+
public boolean isConnected() {
333+
boolean connected = super.isConnected();
334+
if (connected && pushThreadMarker.get()
335+
&& paused.compareAndSet(false, true)) {
336+
pushReachedBarrier.countDown();
337+
try {
338+
disconnectProceed.await(2, TimeUnit.SECONDS);
339+
} catch (InterruptedException e) {
340+
Thread.currentThread().interrupt();
341+
}
342+
}
343+
return connected;
344+
}
345+
};
346+
testConnection.connect(resource);
347+
348+
// Dedicated executor to guarantee that push and disconnect can
349+
// run concurrently, independent of the common pool parallelism.
350+
ExecutorService executor = Executors.newFixedThreadPool(2);
351+
try {
352+
CompletableFuture<Throwable> pushFuture = CompletableFuture
353+
.supplyAsync(() -> {
354+
pushThreadMarker.set(Boolean.TRUE);
355+
httpSessionLock.lock();
356+
try {
357+
vaadinSession.runWithLock(() -> {
358+
testConnection.push();
359+
return null;
360+
});
361+
return (Throwable) null;
362+
} catch (Throwable t) {
363+
return t;
364+
} finally {
365+
httpSessionLock.unlock();
366+
pushThreadMarker.remove();
367+
}
368+
}, executor);
369+
370+
// Wait until push() has read disconnecting=false and is
371+
// paused just before entering synchronized(lock).
372+
Assert.assertTrue("Push thread did not reach the barrier",
373+
pushReachedBarrier.await(2, TimeUnit.SECONDS));
374+
375+
// Start a concurrent disconnect(). It will CAS disconnecting
376+
// from false to true, enter synchronized(lock), and then
377+
// attempt resource.close(), which requires the HTTP session
378+
// lock held by the push thread.
379+
CompletableFuture<Throwable> disconnectFuture = CompletableFuture
380+
.supplyAsync(() -> {
381+
try {
382+
testConnection.disconnect();
383+
return (Throwable) null;
384+
} catch (Throwable t) {
385+
return t;
386+
}
387+
}, executor);
388+
389+
// Wait deterministically until disconnect() has reached
390+
// resource.close() before releasing the push thread.
391+
Assert.assertTrue("Disconnect did not reach resource.close()",
392+
disconnectReachedClose.await(2, TimeUnit.SECONDS));
393+
394+
// Release the push thread so it proceeds toward
395+
// synchronized(lock). Without the fix, it blocks here
396+
// forever.
397+
disconnectProceed.countDown();
398+
399+
Throwable pushError = pushFuture.get(5, TimeUnit.SECONDS);
400+
Throwable disconnectError = disconnectFuture.get(5,
401+
TimeUnit.SECONDS);
402+
403+
if (disconnectError != null) {
404+
Assert.fail("Disconnect failed (likely deadlock): "
405+
+ disconnectError);
406+
}
407+
if (pushError != null) {
408+
Assert.fail("Push failed: " + pushError);
409+
}
410+
} finally {
411+
executor.shutdownNow();
412+
}
413+
}
414+
285415
}

0 commit comments

Comments
 (0)