From 8057708b00763eb8ae205617f0b40ce3a590c164 Mon Sep 17 00:00:00 2001 From: Robbie Gemmell Date: Thu, 13 Apr 2017 14:57:43 +0100 Subject: [PATCH] expose detach method on links, add a matching detach handler for completion/server usage This fixes #39 This fixes #45 --- src/main/java/io/vertx/proton/ProtonLink.java | 29 ++++- .../io/vertx/proton/impl/ProtonLinkImpl.java | 14 +++ .../io/vertx/proton/impl/ProtonTransport.java | 6 +- .../io/vertx/proton/ProtonClientTest.java | 115 ++++++++++++++++++ 4 files changed, 159 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/vertx/proton/ProtonLink.java b/src/main/java/io/vertx/proton/ProtonLink.java index 4c06db6..dbb25ca 100644 --- a/src/main/java/io/vertx/proton/ProtonLink.java +++ b/src/main/java/io/vertx/proton/ProtonLink.java @@ -39,15 +39,27 @@ public interface ProtonLink> { T open(); /** - * Closes the AMQP link, i.e. allows the Detach frame to be emitted. + * Closes the AMQP link, i.e. allows the Detach frame to be emitted with closed=true set. * - * If the closure is being locally initiated, the {@link #closeHandler(Handler)} may be used to handle the peer - * sending their Detach frame. + * If the closure is being locally initiated, the {@link #closeHandler(Handler)} should be used to handle the peer + * sending their Detach frame with closed=true (and {@link #detachHandler(Handler)} can be used to handle the peer + * sending their Detach frame with closed=false). * * @return the link */ T close(); + /** + * Detaches the AMQP link, i.e. allows the Detach frame to be emitted with closed=false. + * + * If the detach is being locally initiated, the {@link #detachHandler(Handler)} should be used to handle the peer + * sending their Detach frame with closed=false (and {@link #closeHandler(Handler)} can be used to handle the peer + * sending their Detach frame with closed=true). + * + * @return the link + */ + T detach(); + /** * Sets a handler for when an AMQP Attach frame is received from the remote peer. * @@ -61,7 +73,7 @@ public interface ProtonLink> { T openHandler(Handler> remoteOpenHandler); /** - * Sets a handler for when an AMQP Detach frame is received from the remote peer. + * Sets a handler for when an AMQP Detach frame with closed=true is received from the remote peer. * * @param remoteCloseHandler * the handler @@ -69,6 +81,15 @@ public interface ProtonLink> { */ T closeHandler(Handler> remoteCloseHandler); + /** + * Sets a handler for when an AMQP Detach frame with closed=false is received from the remote peer. + * + * @param remoteDetachHandler + * the handler + * @return the link + */ + T detachHandler(Handler> remoteDetachHandler); + /** * Gets the local QOS config. * diff --git a/src/main/java/io/vertx/proton/impl/ProtonLinkImpl.java b/src/main/java/io/vertx/proton/impl/ProtonLinkImpl.java index f4199b8..33778ca 100644 --- a/src/main/java/io/vertx/proton/impl/ProtonLinkImpl.java +++ b/src/main/java/io/vertx/proton/impl/ProtonLinkImpl.java @@ -38,6 +38,7 @@ abstract class ProtonLinkImpl> implements ProtonLink protected final Link link; private Handler> openHandler; private Handler> closeHandler; + private Handler> detachHandler; ProtonLinkImpl(Link link) { this.link = link; @@ -182,6 +183,7 @@ public T close() { return self(); } + @Override public T detach() { link.detach(); getSession().getConnectionImpl().flush(); @@ -200,6 +202,12 @@ public T closeHandler(Handler> closeHandler) { return self(); } + @Override + public T detachHandler(Handler> detachHandler) { + this.detachHandler = detachHandler; + return self(); + } + @Override public boolean isOpen() { return getLocalState() == EndpointState.ACTIVE; @@ -252,6 +260,12 @@ void fireRemoteOpen() { } } + void fireRemoteDetach() { + if (detachHandler != null) { + detachHandler.handle(ProtonHelper.future(null, getRemoteCondition())); + } + } + void fireRemoteClose() { if (closeHandler != null) { closeHandler.handle(ProtonHelper.future(self(), getRemoteCondition())); diff --git a/src/main/java/io/vertx/proton/impl/ProtonTransport.java b/src/main/java/io/vertx/proton/impl/ProtonTransport.java index 8d3121b..8b6b1db 100644 --- a/src/main/java/io/vertx/proton/impl/ProtonTransport.java +++ b/src/main/java/io/vertx/proton/impl/ProtonTransport.java @@ -133,6 +133,11 @@ private void handleSocketBuffer(Buffer buff) { } break; } + case LINK_REMOTE_DETACH: { + ProtonLinkImpl link = (ProtonLinkImpl) protonEvent.getLink().getContext(); + link.fireRemoteDetach(); + break; + } case LINK_REMOTE_CLOSE: { ProtonLinkImpl link = (ProtonLinkImpl) protonEvent.getLink().getContext(); link.fireRemoteClose(); @@ -173,7 +178,6 @@ private void handleSocketBuffer(Buffer buff) { case LINK_INIT: case LINK_LOCAL_OPEN: case LINK_LOCAL_DETACH: - case LINK_REMOTE_DETACH: case LINK_LOCAL_CLOSE: case LINK_FINAL: } diff --git a/src/test/java/io/vertx/proton/ProtonClientTest.java b/src/test/java/io/vertx/proton/ProtonClientTest.java index d720753..66b81e6 100644 --- a/src/test/java/io/vertx/proton/ProtonClientTest.java +++ b/src/test/java/io/vertx/proton/ProtonClientTest.java @@ -1002,6 +1002,121 @@ public void validate(TestContext context, Map props) { } + @Test(timeout = 20000) + public void testDetachHandlingWithSender(TestContext context) throws Exception { + doDetachHandlingTestImpl(context, true); + } + + @Test(timeout = 20000) + public void testDetachHandlingWithReceiver(TestContext context) throws Exception { + doDetachHandlingTestImpl(context, false); + } + + public void doDetachHandlingTestImpl(TestContext context, boolean clientSender) throws Exception { + server.close(); + + final Async clientLinkOpenAsync = context.async(); + final Async serverLinkOpenAsync = context.async(); + final Async serverLinkDetachAsync = context.async(); + final Async clientLinkDetachAsync = context.async(); + final AtomicBoolean serverLinkCloseHandlerFired = new AtomicBoolean(); + final AtomicBoolean clientLinkCloseHandlerFired = new AtomicBoolean(); + + ProtonServer protonServer = null; + try { + protonServer = createServer((serverConnection) -> { + serverConnection.openHandler(result -> { + serverConnection.open(); + }); + + serverConnection.sessionOpenHandler(session -> session.open()); + + if (clientSender) { + serverConnection.receiverOpenHandler(serverReceiver -> { + LOG.trace("Server receiver opened"); + serverReceiver.open(); + serverLinkOpenAsync.complete(); + + serverReceiver.closeHandler(res -> { + serverLinkCloseHandlerFired.set(true); + }); + + serverReceiver.detachHandler(res -> { + context.assertTrue(res.succeeded(), "expected non-errored async result"); + serverReceiver.detach(); + serverLinkDetachAsync.complete(); + }); + }); + } else { + serverConnection.senderOpenHandler(serverSender -> { + LOG.trace("Server sender opened"); + serverSender.open(); + serverLinkOpenAsync.complete(); + + serverSender.closeHandler(res -> { + serverLinkCloseHandlerFired.set(true); + }); + + serverSender.detachHandler(res -> { + context.assertTrue(res.succeeded(), "expected non-errored async result"); + serverSender.detach(); + serverLinkDetachAsync.complete(); + }); + }); + } + }); + + // ===== Client Handling ===== + + ProtonClient client = ProtonClient.create(vertx); + client.connect("localhost", protonServer.actualPort(), res -> { + context.assertTrue(res.succeeded()); + + ProtonConnection connection = res.result(); + connection.openHandler(x -> { + LOG.trace("Client connection opened"); + final ProtonLink link; + if (clientSender) { + link = connection.createSender(null); + } else { + link = connection.createReceiver("some-address"); + } + + link.closeHandler(clientLink -> { + clientLinkCloseHandlerFired.set(true); + }); + + link.detachHandler(clientLink -> { + LOG.trace("Client link detached"); + clientLinkDetachAsync.complete(); + }); + + link.openHandler(y -> { + LOG.trace("Client link opened"); + clientLinkOpenAsync.complete(); + + link.detach(); + }); + link.open(); + + }).open(); + }); + + serverLinkOpenAsync.awaitSuccess(); + clientLinkOpenAsync.awaitSuccess(); + + serverLinkDetachAsync.awaitSuccess(); + clientLinkDetachAsync.awaitSuccess(); + } finally { + if (protonServer != null) { + protonServer.close(); + } + } + + context.assertFalse(serverLinkCloseHandlerFired.get(), "server link close handler should not have fired"); + context.assertFalse(clientLinkCloseHandlerFired.get(), "client link close handler should not have fired"); + } + private ProtonServer createServer(Handler serverConnHandler) throws InterruptedException, ExecutionException { ProtonServer server = ProtonServer.create(vertx);