From 3c86675c707e14c40aeaf0480a35d499fd4822d1 Mon Sep 17 00:00:00 2001 From: Vaadin Bot Date: Fri, 3 Feb 2023 20:51:24 +0100 Subject: [PATCH] fix: prevent concurrent disconnect and push operations (#15767) (#15823) * fix: prevent concurrent disconnect and push operations Disconnecting an AtmospherePushConnection while it is sending a message may result in a NullPointerException if AtmosphereResource is nullified before the message is sent. This change synchronizes operations, so that disconnect will wait until current push finishes, or push waits for disconnect to complete so that the isConnected() method reflects correctly current state. Fixes #15571 * fixed test Co-authored-by: Marco Collovati --- .../AtmospherePushConnection.java | 103 +++++++++--------- .../AtmospherePushConnectionTest.java | 81 +++++++++++++- 2 files changed, 132 insertions(+), 52 deletions(-) diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java index 5c92b1a7684..b97fc9b21aa 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java @@ -54,6 +54,8 @@ public class AtmospherePushConnection implements PushConnection { private transient FragmentedMessage incomingMessage; private transient Future outgoingMessage; + private transient Object lock = new Object(); + /** * Represents a message that can arrive as multiple fragments. */ @@ -185,19 +187,21 @@ public void push() { * false if it is a response to a client request. */ public void push(boolean async) { - if (!isConnected()) { - if (async && state != State.RESPONSE_PENDING) { - state = State.PUSH_PENDING; + synchronized (lock) { + if (!isConnected()) { + if (async && state != State.RESPONSE_PENDING) { + state = State.PUSH_PENDING; + } else { + state = State.RESPONSE_PENDING; + } } else { - state = State.RESPONSE_PENDING; - } - } else { - try { - JsonObject response = new UidlWriter().createUidl(getUI(), - async); - sendMessage("for(;;);[" + response.toJson() + "]"); - } catch (Exception e) { - throw new RuntimeException("Push failed", e); + try { + JsonObject response = new UidlWriter().createUidl(getUI(), + async); + sendMessage("for(;;);[" + response.toJson() + "]"); + } catch (Exception e) { + throw new RuntimeException("Push failed", e); + } } } } @@ -307,46 +311,46 @@ protected AtmosphereResource getResource() { @Override public void disconnect() { - assert isConnected(); - - if (resource == null) { - // Already disconnected. Should not happen but if it does, we don't - // want to cause NPEs - getLogger().debug( - "AtmospherePushConnection.disconnect() called twice, this should not happen"); - return; - } - if (resource.isResumed()) { - // This can happen for long polling because of - // http://dev.vaadin.com/ticket/16919 - // Once that is fixed, this should never happen - connectionLost(); - return; - } - - if (outgoingMessage != null) { - // Wait for the last message to be sent before closing the - // connection (assumes that futures are completed in order) + synchronized (lock) { + assert isConnected(); + if (resource == null) { + // Already disconnected. Should not happen but if it does, we + // don't + // want to cause NPEs + getLogger().debug( + "AtmospherePushConnection.disconnect() called twice, this should not happen"); + return; + } + if (resource.isResumed()) { + // This can happen for long polling because of + // http://dev.vaadin.com/ticket/16919 + // Once that is fixed, this should never happen + connectionLost(); + return; + } + if (outgoingMessage != null) { + // Wait for the last message to be sent before closing the + // connection (assumes that futures are completed in order) + try { + outgoingMessage.get(1000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + getLogger().info( + "Timeout waiting for messages to be sent to client before disconnect", + e); + } catch (Exception e) { + getLogger().info( + "Error waiting for messages to be sent to client before disconnect", + e); + } + outgoingMessage = null; + } try { - outgoingMessage.get(1000, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - getLogger().info( - "Timeout waiting for messages to be sent to client before disconnect", - e); - } catch (Exception e) { - getLogger().info( - "Error waiting for messages to be sent to client before disconnect", - e); + resource.close(); + } catch (IOException e) { + getLogger().info("Error when closing push connection", e); } - outgoingMessage = null; - } - - try { - resource.close(); - } catch (IOException e) { - getLogger().info("Error when closing push connection", e); + connectionLost(); } - connectionLost(); } /** @@ -389,6 +393,7 @@ private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { stream.defaultReadObject(); state = State.DISCONNECTED; + lock = new Object(); } private static Logger getLogger() { diff --git a/flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java b/flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java index 6a5f5690c15..3658bbc8914 100644 --- a/flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java +++ b/flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java @@ -19,15 +19,21 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; - -import com.vaadin.flow.component.UI; -import com.vaadin.flow.server.communication.AtmospherePushConnection.State; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.Broadcaster; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import com.vaadin.flow.component.UI; +import com.vaadin.flow.server.MockVaadinSession; +import com.vaadin.flow.server.communication.AtmospherePushConnection.State; + /** * @author Vaadin Ltd * @since 1.0 @@ -53,4 +59,73 @@ public void testSerialization() throws Exception { Assert.assertEquals(State.DISCONNECTED, connection.getState()); } + + @Test + public void pushWhileDisconnect_disconnectedWithoutSendingMessage() + throws Exception { + + UI ui = Mockito.spy(new UI()); + MockVaadinSession vaadinSession = new MockVaadinSession(); + Mockito.when(ui.getSession()).thenReturn(vaadinSession); + Broadcaster broadcaster = Mockito.mock(Broadcaster.class); + AtmosphereResource resource = Mockito.mock(AtmosphereResource.class); + Mockito.when(resource.getBroadcaster()).thenReturn(broadcaster); + + AtmospherePushConnection connection = new AtmospherePushConnection(ui); + connection.connect(resource); + + CountDownLatch latch = new CountDownLatch(1); + CompletableFuture.runAsync(() -> { + try { + vaadinSession.runWithLock(() -> { + connection.push(); + return null; + }); + } catch (Exception ex) { + throw new RuntimeException(ex); + } finally { + latch.countDown(); + } + }); + connection.disconnect(); + Assert.assertTrue("AtmospherePushConnection not disconnected", + latch.await(2, TimeUnit.SECONDS)); + Assert.assertEquals(State.PUSH_PENDING, connection.getState()); + Mockito.verifyNoInteractions(broadcaster); + } + + @Test + public void disconnectWhilePush_messageSentAndThenDisconnected() + throws Exception { + + UI ui = Mockito.spy(new UI()); + MockVaadinSession vaadinSession = new MockVaadinSession(); + Mockito.when(ui.getSession()).thenReturn(vaadinSession); + Broadcaster broadcaster = Mockito.mock(Broadcaster.class); + AtmosphereResource resource = Mockito.mock(AtmosphereResource.class); + Mockito.when(resource.getBroadcaster()).thenReturn(broadcaster); + + AtmospherePushConnection connection = new AtmospherePushConnection(ui); + connection.connect(resource); + + CountDownLatch latch = new CountDownLatch(1); + CompletableFuture.runAsync(() -> { + try { + vaadinSession.runWithLock(() -> { + CompletableFuture.runAsync(connection::disconnect); + connection.push(); + return null; + }); + } catch (Throwable ex) { + throw new RuntimeException(ex); + } finally { + latch.countDown(); + } + }); + Assert.assertTrue("Push not completed", + latch.await(2, TimeUnit.SECONDS)); + Mockito.verify(broadcaster).broadcast(ArgumentMatchers.any(), + ArgumentMatchers.eq(resource)); + } + }