diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java index 0cab6359aed..f405205698f 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java @@ -54,6 +54,8 @@ public class AtmospherePushConnection implements PushConnection { private transient FragmentedMessage incomingMessage; private transient Future outgoingMessage; + private transient Object lock = new Object(); + /** * Represents a message that can arrive as multiple fragments. */ @@ -185,19 +187,21 @@ public void push() { * false if it is a response to a client request. */ public void push(boolean async) { - if (!isConnected()) { - if (async && state != State.RESPONSE_PENDING) { - state = State.PUSH_PENDING; + synchronized (lock) { + if (!isConnected()) { + if (async && state != State.RESPONSE_PENDING) { + state = State.PUSH_PENDING; + } else { + state = State.RESPONSE_PENDING; + } } else { - state = State.RESPONSE_PENDING; - } - } else { - try { - JsonObject response = new UidlWriter().createUidl(getUI(), - async); - sendMessage("for(;;);[" + response.toJson() + "]"); - } catch (Exception e) { - throw new RuntimeException("Push failed", e); + try { + JsonObject response = new UidlWriter().createUidl(getUI(), + async); + sendMessage("for(;;);[" + response.toJson() + "]"); + } catch (Exception e) { + throw new RuntimeException("Push failed", e); + } } } } @@ -307,46 +311,46 @@ protected AtmosphereResource getResource() { @Override public void disconnect() { - assert isConnected(); - - if (resource == null) { - // Already disconnected. Should not happen but if it does, we don't - // want to cause NPEs - getLogger().debug( - "AtmospherePushConnection.disconnect() called twice, this should not happen"); - return; - } - if (resource.isResumed()) { - // This can happen for long polling because of - // http://dev.vaadin.com/ticket/16919 - // Once that is fixed, this should never happen - connectionLost(); - return; - } - - if (outgoingMessage != null) { - // Wait for the last message to be sent before closing the - // connection (assumes that futures are completed in order) + synchronized (lock) { + assert isConnected(); + if (resource == null) { + // Already disconnected. Should not happen but if it does, we + // don't + // want to cause NPEs + getLogger().debug( + "AtmospherePushConnection.disconnect() called twice, this should not happen"); + return; + } + if (resource.isResumed()) { + // This can happen for long polling because of + // http://dev.vaadin.com/ticket/16919 + // Once that is fixed, this should never happen + connectionLost(); + return; + } + if (outgoingMessage != null) { + // Wait for the last message to be sent before closing the + // connection (assumes that futures are completed in order) + try { + outgoingMessage.get(1000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + getLogger().info( + "Timeout waiting for messages to be sent to client before disconnect", + e); + } catch (Exception e) { + getLogger().info( + "Error waiting for messages to be sent to client before disconnect", + e); + } + outgoingMessage = null; + } try { - outgoingMessage.get(1000, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - getLogger().info( - "Timeout waiting for messages to be sent to client before disconnect", - e); - } catch (Exception e) { - getLogger().info( - "Error waiting for messages to be sent to client before disconnect", - e); + resource.close(); + } catch (IOException e) { + getLogger().info("Error when closing push connection", e); } - outgoingMessage = null; - } - - try { - resource.close(); - } catch (IOException e) { - getLogger().info("Error when closing push connection", e); + connectionLost(); } - connectionLost(); } /** @@ -389,6 +393,7 @@ private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { stream.defaultReadObject(); state = State.DISCONNECTED; + lock = new Object(); } private static Logger getLogger() { diff --git a/flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java b/flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java index 50a68018636..3c1b9babc6b 100644 --- a/flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java +++ b/flow-server/src/test/java/com/vaadin/flow/server/communication/AtmospherePushConnectionTest.java @@ -19,15 +19,21 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; - -import com.vaadin.flow.component.UI; -import com.vaadin.flow.server.communication.AtmospherePushConnection.State; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.Broadcaster; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import com.vaadin.flow.component.UI; +import com.vaadin.flow.server.MockVaadinSession; +import com.vaadin.flow.server.communication.AtmospherePushConnection.State; + /** * @author Vaadin Ltd * @since 1.0 @@ -53,4 +59,73 @@ public void testSerialization() throws Exception { Assert.assertEquals(State.DISCONNECTED, connection.getState()); } + + @Test + public void pushWhileDisconnect_disconnectedWithoutSendingMessage() + throws Exception { + + UI ui = Mockito.spy(new UI()); + MockVaadinSession vaadinSession = new MockVaadinSession(); + Mockito.when(ui.getSession()).thenReturn(vaadinSession); + Broadcaster broadcaster = Mockito.mock(Broadcaster.class); + AtmosphereResource resource = Mockito.mock(AtmosphereResource.class); + Mockito.when(resource.getBroadcaster()).thenReturn(broadcaster); + + AtmospherePushConnection connection = new AtmospherePushConnection(ui); + connection.connect(resource); + + CountDownLatch latch = new CountDownLatch(1); + CompletableFuture.runAsync(() -> { + try { + vaadinSession.runWithLock(() -> { + connection.push(); + return null; + }); + } catch (Exception ex) { + throw new RuntimeException(ex); + } finally { + latch.countDown(); + } + }); + connection.disconnect(); + Assert.assertTrue("AtmospherePushConnection not disconnected", + latch.await(2, TimeUnit.SECONDS)); + Assert.assertEquals(State.PUSH_PENDING, connection.getState()); + Mockito.verifyNoInteractions(broadcaster); + } + + @Test + public void disconnectWhilePush_messageSentAndThenDisconnected() + throws Exception { + + UI ui = Mockito.spy(new UI()); + MockVaadinSession vaadinSession = new MockVaadinSession(); + Mockito.when(ui.getSession()).thenReturn(vaadinSession); + Broadcaster broadcaster = Mockito.mock(Broadcaster.class); + AtmosphereResource resource = Mockito.mock(AtmosphereResource.class); + Mockito.when(resource.getBroadcaster()).thenReturn(broadcaster); + + AtmospherePushConnection connection = new AtmospherePushConnection(ui); + connection.connect(resource); + + CountDownLatch latch = new CountDownLatch(1); + CompletableFuture.runAsync(() -> { + try { + vaadinSession.runWithLock(() -> { + CompletableFuture.runAsync(connection::disconnect); + connection.push(); + return null; + }); + } catch (Throwable ex) { + throw new RuntimeException(ex); + } finally { + latch.countDown(); + } + }); + Assert.assertTrue("Push not completed", + latch.await(2, TimeUnit.SECONDS)); + Mockito.verify(broadcaster).broadcast(ArgumentMatchers.any(), + ArgumentMatchers.eq(resource)); + } + }