Skip to content

Commit

Permalink
fix: prevent concurrent disconnect and push operations (#15767) (#15822)
Browse files Browse the repository at this point in the history
* fix: prevent concurrent disconnect and push operations

Disconnecting an AtmospherePushConnection while it is sending a message
may result in a NullPointerException if AtmosphereResource is nullified
before the message is sent.
This change synchronizes operations, so that disconnect will wait until
current push finishes, or push waits for disconnect to complete so that
the isConnected() method reflects correctly current state.

Fixes #15571

* fixed test

Co-authored-by: Marco Collovati <marco@vaadin.com>
  • Loading branch information
vaadin-bot and mcollovati committed Feb 3, 2023
1 parent 269b853 commit c2d46c1
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 52 deletions.
Expand Up @@ -54,6 +54,8 @@ public class AtmospherePushConnection implements PushConnection {
private transient FragmentedMessage incomingMessage;
private transient Future<Object> outgoingMessage;

private transient Object lock = new Object();

/**
* Represents a message that can arrive as multiple fragments.
*/
Expand Down Expand Up @@ -185,19 +187,21 @@ public void push() {
* false if it is a response to a client request.
*/
public void push(boolean async) {
if (!isConnected()) {
if (async && state != State.RESPONSE_PENDING) {
state = State.PUSH_PENDING;
synchronized (lock) {
if (!isConnected()) {
if (async && state != State.RESPONSE_PENDING) {
state = State.PUSH_PENDING;
} else {
state = State.RESPONSE_PENDING;
}
} else {
state = State.RESPONSE_PENDING;
}
} else {
try {
JsonObject response = new UidlWriter().createUidl(getUI(),
async);
sendMessage("for(;;);[" + response.toJson() + "]");
} catch (Exception e) {
throw new RuntimeException("Push failed", e);
try {
JsonObject response = new UidlWriter().createUidl(getUI(),
async);
sendMessage("for(;;);[" + response.toJson() + "]");
} catch (Exception e) {
throw new RuntimeException("Push failed", e);
}
}
}
}
Expand Down Expand Up @@ -307,46 +311,46 @@ protected AtmosphereResource getResource() {

@Override
public void disconnect() {
assert isConnected();

if (resource == null) {
// Already disconnected. Should not happen but if it does, we don't
// want to cause NPEs
getLogger().debug(
"AtmospherePushConnection.disconnect() called twice, this should not happen");
return;
}
if (resource.isResumed()) {
// This can happen for long polling because of
// http://dev.vaadin.com/ticket/16919
// Once that is fixed, this should never happen
connectionLost();
return;
}

if (outgoingMessage != null) {
// Wait for the last message to be sent before closing the
// connection (assumes that futures are completed in order)
synchronized (lock) {
assert isConnected();
if (resource == null) {
// Already disconnected. Should not happen but if it does, we
// don't
// want to cause NPEs
getLogger().debug(
"AtmospherePushConnection.disconnect() called twice, this should not happen");
return;
}
if (resource.isResumed()) {
// This can happen for long polling because of
// http://dev.vaadin.com/ticket/16919
// Once that is fixed, this should never happen
connectionLost();
return;
}
if (outgoingMessage != null) {
// Wait for the last message to be sent before closing the
// connection (assumes that futures are completed in order)
try {
outgoingMessage.get(1000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
getLogger().info(
"Timeout waiting for messages to be sent to client before disconnect",
e);
} catch (Exception e) {
getLogger().info(
"Error waiting for messages to be sent to client before disconnect",
e);
}
outgoingMessage = null;
}
try {
outgoingMessage.get(1000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
getLogger().info(
"Timeout waiting for messages to be sent to client before disconnect",
e);
} catch (Exception e) {
getLogger().info(
"Error waiting for messages to be sent to client before disconnect",
e);
resource.close();
} catch (IOException e) {
getLogger().info("Error when closing push connection", e);
}
outgoingMessage = null;
}

try {
resource.close();
} catch (IOException e) {
getLogger().info("Error when closing push connection", e);
connectionLost();
}
connectionLost();
}

/**
Expand Down Expand Up @@ -389,6 +393,7 @@ private void readObject(ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
state = State.DISCONNECTED;
lock = new Object();
}

private static Logger getLogger() {
Expand Down
Expand Up @@ -19,15 +19,21 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import com.vaadin.flow.component.UI;
import com.vaadin.flow.server.communication.AtmospherePushConnection.State;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.Broadcaster;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

import com.vaadin.flow.component.UI;
import com.vaadin.flow.server.MockVaadinSession;
import com.vaadin.flow.server.communication.AtmospherePushConnection.State;

/**
* @author Vaadin Ltd
* @since 1.0
Expand All @@ -53,4 +59,73 @@ public void testSerialization() throws Exception {

Assert.assertEquals(State.DISCONNECTED, connection.getState());
}

@Test
public void pushWhileDisconnect_disconnectedWithoutSendingMessage()
throws Exception {

UI ui = Mockito.spy(new UI());
MockVaadinSession vaadinSession = new MockVaadinSession();
Mockito.when(ui.getSession()).thenReturn(vaadinSession);
Broadcaster broadcaster = Mockito.mock(Broadcaster.class);
AtmosphereResource resource = Mockito.mock(AtmosphereResource.class);
Mockito.when(resource.getBroadcaster()).thenReturn(broadcaster);

AtmospherePushConnection connection = new AtmospherePushConnection(ui);
connection.connect(resource);

CountDownLatch latch = new CountDownLatch(1);
CompletableFuture.runAsync(() -> {
try {
vaadinSession.runWithLock(() -> {
connection.push();
return null;
});
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
latch.countDown();
}
});
connection.disconnect();
Assert.assertTrue("AtmospherePushConnection not disconnected",
latch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(State.PUSH_PENDING, connection.getState());
Mockito.verifyNoInteractions(broadcaster);
}

@Test
public void disconnectWhilePush_messageSentAndThenDisconnected()
throws Exception {

UI ui = Mockito.spy(new UI());
MockVaadinSession vaadinSession = new MockVaadinSession();
Mockito.when(ui.getSession()).thenReturn(vaadinSession);
Broadcaster broadcaster = Mockito.mock(Broadcaster.class);
AtmosphereResource resource = Mockito.mock(AtmosphereResource.class);
Mockito.when(resource.getBroadcaster()).thenReturn(broadcaster);

AtmospherePushConnection connection = new AtmospherePushConnection(ui);
connection.connect(resource);

CountDownLatch latch = new CountDownLatch(1);
CompletableFuture.runAsync(() -> {
try {
vaadinSession.runWithLock(() -> {
CompletableFuture.runAsync(connection::disconnect);
connection.push();
return null;
});
} catch (Throwable ex) {
throw new RuntimeException(ex);
} finally {
latch.countDown();
}
});
Assert.assertTrue("Push not completed",
latch.await(2, TimeUnit.SECONDS));
Mockito.verify(broadcaster).broadcast(ArgumentMatchers.any(),
ArgumentMatchers.eq(resource));
}

}

0 comments on commit c2d46c1

Please sign in to comment.