diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index f6afab81d..c1d0fd2a3 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -66,8 +66,7 @@ final class LocalDuplexConnection implements DuplexConnection { @Override public void dispose() { - out.dispose(); - in.dispose(); + out.onComplete(); } @Override diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java index 361fec52a..095de3f0e 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java @@ -23,7 +23,6 @@ import java.time.Duration; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Mono; import reactor.test.StepVerifier; final class LocalClientTransportTest { @@ -33,11 +32,13 @@ final class LocalClientTransportTest { void connect() { LocalServerTransport serverTransport = LocalServerTransport.createEphemeral(); - Closeable closeable = serverTransport.start(duplexConnection -> Mono.empty()).block(); + Closeable closeable = + serverTransport.start(duplexConnection -> duplexConnection.receive().then()).block(); try { LocalClientTransport.create(serverTransport.getName()) .connect() + .doOnNext(d -> d.receive().subscribe()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete();