From a28cef4cb42d843df9e09711cc6acc305a4bd781 Mon Sep 17 00:00:00 2001 From: jabolina Date: Thu, 9 Mar 2023 14:15:58 -0300 Subject: [PATCH] ISPN-14571 Propagate errors in REST chunked response --- .../rest/impl/okhttp/RestClientOkHttp.java | 2 + .../rest/stream/CacheChunkedStream.java | 3 +- .../rest/resources/CacheResourceV2Test.java | 64 ++++++++++++++++++- 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/client/rest-client/src/main/java/org/infinispan/client/rest/impl/okhttp/RestClientOkHttp.java b/client/rest-client/src/main/java/org/infinispan/client/rest/impl/okhttp/RestClientOkHttp.java index 77098188a838..491d4d883e56 100644 --- a/client/rest-client/src/main/java/org/infinispan/client/rest/impl/okhttp/RestClientOkHttp.java +++ b/client/rest-client/src/main/java/org/infinispan/client/rest/impl/okhttp/RestClientOkHttp.java @@ -116,6 +116,8 @@ public RestClientOkHttp(RestClientConfiguration configuration) { } else { builder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1)); } + // OkHttp might retry infinitely on HTTP/2. + builder.retryOnConnectionFailure(false); break; } diff --git a/server/rest/src/main/java/org/infinispan/rest/stream/CacheChunkedStream.java b/server/rest/src/main/java/org/infinispan/rest/stream/CacheChunkedStream.java index ef5404e815d6..88daa2446144 100644 --- a/server/rest/src/main/java/org/infinispan/rest/stream/CacheChunkedStream.java +++ b/server/rest/src/main/java/org/infinispan/rest/stream/CacheChunkedStream.java @@ -6,7 +6,6 @@ import org.infinispan.commons.marshall.WrappedByteArray; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; -import org.reactivestreams.Publisher; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -17,6 +16,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.reactivex.rxjava3.subscribers.DefaultSubscriber; +import org.reactivestreams.Publisher; public abstract class CacheChunkedStream { protected static final Log logger = LogFactory.getLog(CacheChunkedStream.class); @@ -114,6 +114,7 @@ public void onError(Throwable t) { pendingBuffer = null; } cancel(); + ctx.close(); } @Override diff --git a/server/rest/src/test/java/org/infinispan/rest/resources/CacheResourceV2Test.java b/server/rest/src/test/java/org/infinispan/rest/resources/CacheResourceV2Test.java index fdb905326abd..360b9a3cf519 100644 --- a/server/rest/src/test/java/org/infinispan/rest/resources/CacheResourceV2Test.java +++ b/server/rest/src/test/java/org/infinispan/rest/resources/CacheResourceV2Test.java @@ -14,6 +14,7 @@ import static org.infinispan.commons.dataconversion.MediaType.TEXT_PLAIN; import static org.infinispan.commons.dataconversion.MediaType.TEXT_PLAIN_TYPE; import static org.infinispan.commons.test.CommonsTestingUtil.tmpDirectory; +import static org.infinispan.commons.util.EnumUtil.EMPTY_BIT_SET; import static org.infinispan.commons.util.Util.getResourceAsString; import static org.infinispan.configuration.cache.IndexStorage.LOCAL_HEAP; import static org.infinispan.context.Flag.SKIP_CACHE_LOAD; @@ -53,7 +54,6 @@ import javax.xml.parsers.DocumentBuilderFactory; -import org.assertj.core.api.Assertions; import org.infinispan.Cache; import org.infinispan.client.rest.RestCacheClient; import org.infinispan.client.rest.RestClient; @@ -67,12 +67,14 @@ import org.infinispan.commons.dataconversion.MediaType; import org.infinispan.commons.dataconversion.internal.Json; import org.infinispan.commons.marshall.ProtoStreamMarshaller; +import org.infinispan.commons.test.Exceptions; import org.infinispan.commons.util.Util; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.global.GlobalConfigurationBuilder; import org.infinispan.configuration.parsing.ParserRegistry; +import org.infinispan.factories.ComponentRegistry; import org.infinispan.globalstate.ConfigurationStorage; import org.infinispan.globalstate.GlobalConfigurationManager; import org.infinispan.globalstate.ScopedState; @@ -81,11 +83,19 @@ import org.infinispan.partitionhandling.PartitionHandling; import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder; import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants; +import org.infinispan.reactive.publisher.impl.ClusterPublisherManager; +import org.infinispan.reactive.publisher.impl.DeliveryGuarantee; +import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier; import org.infinispan.rest.ResponseHeader; import org.infinispan.rest.assertion.ResponseAssertion; import org.infinispan.test.TestException; import org.infinispan.test.TestingUtil; import org.infinispan.topology.LocalTopologyManager; + +import io.reactivex.rxjava3.core.Flowable; +import okhttp3.internal.http2.StreamResetException; +import org.assertj.core.api.Assertions; +import org.mockito.Mockito; import org.testng.annotations.Test; import org.testng.reporters.Files; import org.w3c.dom.Document; @@ -824,6 +834,58 @@ private void createAndWriteToCache(String name, Configuration configuration) { assertThat(response).isOk(); } + @Test + public void testKeyStreamWithFailure() { + String exceptionMessage = "Expected failure"; + Cache c = cacheManagers.get(0).getCache("default"); + ComponentRegistry ccr = TestingUtil.extractComponentRegistry(c); + ClusterPublisherManager cpm = ccr.getClusterPublisherManager().running(); + ClusterPublisherManager spyCpm = Mockito.spy(cpm); + Mockito.doAnswer(ivk -> { + SegmentPublisherSupplier sps = (SegmentPublisherSupplier) ivk.callRealMethod(); + SegmentPublisherSupplier spySps = Mockito.spy(sps); + Mockito.doAnswer(ignore -> Flowable.error(new RuntimeException(exceptionMessage))) + .when(spySps).publisherWithoutSegments(); + return spySps; + }).when(spyCpm).keyPublisher(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(EMPTY_BIT_SET), + Mockito.eq(DeliveryGuarantee.EXACTLY_ONCE), Mockito.eq(1000), Mockito.any()); + TestingUtil.replaceComponent(c, ClusterPublisherManager.class, spyCpm, true); + + if (protocol == HTTP_11) { + Exceptions.expectCompletionException(IOException.class, "unexpected end of stream on .*", client.cache("default").keys()); + } else { + // OkHttp wraps the exception in a StreamResetException. + Exceptions.expectCompletionException(StreamResetException.class, "stream was reset: .*", client.cache("default").keys()); + } + TestingUtil.replaceComponent(c, ClusterPublisherManager.class, cpm, true); + } + + @Test + public void testEntryStreamWithFailure() { + String exceptionMessage = "Expected failure"; + Cache c = cacheManagers.get(0).getCache("default"); + ComponentRegistry ccr = TestingUtil.extractComponentRegistry(c); + ClusterPublisherManager cpm = ccr.getClusterPublisherManager().running(); + ClusterPublisherManager spyCpm = Mockito.spy(cpm); + Mockito.doAnswer(ivk -> { + SegmentPublisherSupplier sps = (SegmentPublisherSupplier) ivk.callRealMethod(); + SegmentPublisherSupplier spySps = Mockito.spy(sps); + Mockito.doAnswer(ignore -> Flowable.error(new RuntimeException(exceptionMessage))) + .when(spySps).publisherWithoutSegments(); + return spySps; + }).when(spyCpm).entryPublisher(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(EMPTY_BIT_SET), + Mockito.eq(DeliveryGuarantee.EXACTLY_ONCE), Mockito.eq(1000), Mockito.any()); + TestingUtil.replaceComponent(c, ClusterPublisherManager.class, spyCpm, true); + + if (protocol == HTTP_11) { + Exceptions.expectCompletionException(IOException.class, "unexpected end of stream on .*", client.cache("default").entries()); + } else { + // OkHttp wraps the exception in a StreamResetException. + Exceptions.expectCompletionException(StreamResetException.class, "stream was reset: .*", client.cache("default").entries()); + } + TestingUtil.replaceComponent(c, ClusterPublisherManager.class, cpm, true); + } + @Test public void testMultiByte() { putTextEntryInCache("default", "José", "Uberlândia");