Skip to content

Commit

Permalink
expose detach method on links, add a matching detach handler for comp…
Browse files Browse the repository at this point in the history
…letion/server usage

This fixes #39
This fixes #45
  • Loading branch information
gemmellr committed Apr 13, 2017
1 parent d2d9231 commit 8057708
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 5 deletions.
29 changes: 25 additions & 4 deletions src/main/java/io/vertx/proton/ProtonLink.java
Expand Up @@ -39,15 +39,27 @@ public interface ProtonLink<T extends ProtonLink<T>> {
T open();

/**
* Closes the AMQP link, i.e. allows the Detach frame to be emitted.
* Closes the AMQP link, i.e. allows the Detach frame to be emitted with closed=true set.
*
* If the closure is being locally initiated, the {@link #closeHandler(Handler)} may be used to handle the peer
* sending their Detach frame.
* If the closure is being locally initiated, the {@link #closeHandler(Handler)} should be used to handle the peer
* sending their Detach frame with closed=true (and {@link #detachHandler(Handler)} can be used to handle the peer
* sending their Detach frame with closed=false).
*
* @return the link
*/
T close();

/**
* Detaches the AMQP link, i.e. allows the Detach frame to be emitted with closed=false.
*
* If the detach is being locally initiated, the {@link #detachHandler(Handler)} should be used to handle the peer
* sending their Detach frame with closed=false (and {@link #closeHandler(Handler)} can be used to handle the peer
* sending their Detach frame with closed=true).
*
* @return the link
*/
T detach();

/**
* Sets a handler for when an AMQP Attach frame is received from the remote peer.
*
Expand All @@ -61,14 +73,23 @@ public interface ProtonLink<T extends ProtonLink<T>> {
T openHandler(Handler<AsyncResult<T>> remoteOpenHandler);

/**
* Sets a handler for when an AMQP Detach frame is received from the remote peer.
* Sets a handler for when an AMQP Detach frame with closed=true is received from the remote peer.
*
* @param remoteCloseHandler
* the handler
* @return the link
*/
T closeHandler(Handler<AsyncResult<T>> remoteCloseHandler);

/**
* Sets a handler for when an AMQP Detach frame with closed=false is received from the remote peer.
*
* @param remoteDetachHandler
* the handler
* @return the link
*/
T detachHandler(Handler<AsyncResult<Void>> remoteDetachHandler);

This comment has been minimized.

Copy link
@ppatierno

ppatierno Apr 14, 2017

Member

Why the detachHandler is different from the closeHandler in terms of provided result ? I mean it's Void instead of T (i.e. ProtonReceiver). Having the ProtonReceiver instance passed in this handler is useful for calling detach() on it for example in the same way I use to call close() in the closeHandler on the passed ProtonReceiver instance. If you agree I opened a PR for this : #46

This comment has been minimized.

Copy link
@ppatierno

ppatierno Apr 14, 2017

Member

I have just figured out that in my use case, the router sends a detach with close=false but with an error condition so it means thet the async result is null in any case and I couldn't get the ProtonReceiver. In any case I think that the PR #46 makes sense to be aligned with the other handlers. Wdyt ?

This comment has been minimized.

Copy link
@gemmellr

gemmellr Apr 14, 2017

Author Contributor

I was kind of waiting for you to ask about doing #46, I almost pointed it out yesterday but decided to see what you thought instead ;)

I actually used void specifically because of your past concerns that you can only actually get the link in the 'succeeded' case, since you cant pass result values the rest of the time. Looking at some other Vert.x APIs they just used void on such callbacks presumably because if you need the reference you actually already have it, since you are setting the handler on it and can thus already refer to it.

I'm happy enough to make this match the others for consistency, I almost did it myself yesterday, though I almost decided to change all the others to void yesterday too.

This comment has been minimized.

Copy link
@ppatierno

ppatierno Apr 14, 2017

Member

It could happen that the detach (close=false) is sent without error condition so the ar.result will be available. In any case, yes I remember that discussion and I continue to not like this way Vert.x uses; it doesn't fit well in the Vert.x Proton library (fmpov) because in a lot of case (and errors) we should have access to the ar.result. I'm happy to know that you merged at least for consistency ;)


/**
* Gets the local QOS config.
*
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/vertx/proton/impl/ProtonLinkImpl.java
Expand Up @@ -38,6 +38,7 @@ abstract class ProtonLinkImpl<T extends ProtonLink<T>> implements ProtonLink<T>
protected final Link link;
private Handler<AsyncResult<T>> openHandler;
private Handler<AsyncResult<T>> closeHandler;
private Handler<AsyncResult<Void>> detachHandler;

ProtonLinkImpl(Link link) {
this.link = link;
Expand Down Expand Up @@ -182,6 +183,7 @@ public T close() {
return self();
}

@Override
public T detach() {
link.detach();
getSession().getConnectionImpl().flush();
Expand All @@ -200,6 +202,12 @@ public T closeHandler(Handler<AsyncResult<T>> closeHandler) {
return self();
}

@Override
public T detachHandler(Handler<AsyncResult<Void>> detachHandler) {
this.detachHandler = detachHandler;
return self();
}

@Override
public boolean isOpen() {
return getLocalState() == EndpointState.ACTIVE;
Expand Down Expand Up @@ -252,6 +260,12 @@ void fireRemoteOpen() {
}
}

void fireRemoteDetach() {
if (detachHandler != null) {
detachHandler.handle(ProtonHelper.future(null, getRemoteCondition()));
}
}

void fireRemoteClose() {
if (closeHandler != null) {
closeHandler.handle(ProtonHelper.future(self(), getRemoteCondition()));
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/io/vertx/proton/impl/ProtonTransport.java
Expand Up @@ -133,6 +133,11 @@ private void handleSocketBuffer(Buffer buff) {
}
break;
}
case LINK_REMOTE_DETACH: {
ProtonLinkImpl<?> link = (ProtonLinkImpl<?>) protonEvent.getLink().getContext();
link.fireRemoteDetach();
break;
}
case LINK_REMOTE_CLOSE: {
ProtonLinkImpl<?> link = (ProtonLinkImpl<?>) protonEvent.getLink().getContext();
link.fireRemoteClose();
Expand Down Expand Up @@ -173,7 +178,6 @@ private void handleSocketBuffer(Buffer buff) {
case LINK_INIT:
case LINK_LOCAL_OPEN:
case LINK_LOCAL_DETACH:
case LINK_REMOTE_DETACH:
case LINK_LOCAL_CLOSE:
case LINK_FINAL:
}
Expand Down
115 changes: 115 additions & 0 deletions src/test/java/io/vertx/proton/ProtonClientTest.java
Expand Up @@ -1002,6 +1002,121 @@ public void validate(TestContext context, Map<Symbol, Object> props) {

}

@Test(timeout = 20000)
public void testDetachHandlingWithSender(TestContext context) throws Exception {
doDetachHandlingTestImpl(context, true);
}

@Test(timeout = 20000)
public void testDetachHandlingWithReceiver(TestContext context) throws Exception {
doDetachHandlingTestImpl(context, false);
}

public void doDetachHandlingTestImpl(TestContext context, boolean clientSender) throws Exception {
server.close();

final Async clientLinkOpenAsync = context.async();
final Async serverLinkOpenAsync = context.async();
final Async serverLinkDetachAsync = context.async();
final Async clientLinkDetachAsync = context.async();
final AtomicBoolean serverLinkCloseHandlerFired = new AtomicBoolean();
final AtomicBoolean clientLinkCloseHandlerFired = new AtomicBoolean();

ProtonServer protonServer = null;
try {
protonServer = createServer((serverConnection) -> {
serverConnection.openHandler(result -> {
serverConnection.open();
});

serverConnection.sessionOpenHandler(session -> session.open());

if (clientSender) {
serverConnection.receiverOpenHandler(serverReceiver -> {
LOG.trace("Server receiver opened");
serverReceiver.open();
serverLinkOpenAsync.complete();

serverReceiver.closeHandler(res -> {
serverLinkCloseHandlerFired.set(true);
});

serverReceiver.detachHandler(res -> {
context.assertTrue(res.succeeded(), "expected non-errored async result");
serverReceiver.detach();
serverLinkDetachAsync.complete();
});
});
} else {
serverConnection.senderOpenHandler(serverSender -> {
LOG.trace("Server sender opened");
serverSender.open();
serverLinkOpenAsync.complete();

serverSender.closeHandler(res -> {
serverLinkCloseHandlerFired.set(true);
});

serverSender.detachHandler(res -> {
context.assertTrue(res.succeeded(), "expected non-errored async result");
serverSender.detach();
serverLinkDetachAsync.complete();
});
});
}
});

// ===== Client Handling =====

ProtonClient client = ProtonClient.create(vertx);
client.connect("localhost", protonServer.actualPort(), res -> {
context.assertTrue(res.succeeded());

ProtonConnection connection = res.result();
connection.openHandler(x -> {
LOG.trace("Client connection opened");
final ProtonLink<?> link;
if (clientSender) {
link = connection.createSender(null);
} else {
link = connection.createReceiver("some-address");
}

link.closeHandler(clientLink -> {
clientLinkCloseHandlerFired.set(true);
});

link.detachHandler(clientLink -> {
LOG.trace("Client link detached");
clientLinkDetachAsync.complete();
});

link.openHandler(y -> {
LOG.trace("Client link opened");
clientLinkOpenAsync.complete();

link.detach();
});
link.open();

}).open();
});

serverLinkOpenAsync.awaitSuccess();
clientLinkOpenAsync.awaitSuccess();

serverLinkDetachAsync.awaitSuccess();
clientLinkDetachAsync.awaitSuccess();
} finally {
if (protonServer != null) {
protonServer.close();
}
}

context.assertFalse(serverLinkCloseHandlerFired.get(), "server link close handler should not have fired");
context.assertFalse(clientLinkCloseHandlerFired.get(), "client link close handler should not have fired");
}

private ProtonServer createServer(Handler<ProtonConnection> serverConnHandler) throws InterruptedException,
ExecutionException {
ProtonServer server = ProtonServer.create(vertx);
Expand Down

0 comments on commit 8057708

Please sign in to comment.