Skip to content

Commit

Permalink
ISPN-14571 Propagate errors in REST chunked response
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina authored and wburns committed Apr 14, 2023
1 parent cdb60db commit a28cef4
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 2 deletions.
Expand Up @@ -116,6 +116,8 @@ public RestClientOkHttp(RestClientConfiguration configuration) {
} else {
builder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1));
}
// OkHttp might retry infinitely on HTTP/2.
builder.retryOnConnectionFailure(false);
break;
}

Expand Down
Expand Up @@ -6,7 +6,6 @@
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand All @@ -17,6 +16,7 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
import org.reactivestreams.Publisher;

public abstract class CacheChunkedStream<T> {
protected static final Log logger = LogFactory.getLog(CacheChunkedStream.class);
Expand Down Expand Up @@ -114,6 +114,7 @@ public void onError(Throwable t) {
pendingBuffer = null;
}
cancel();
ctx.close();
}

@Override
Expand Down
Expand Up @@ -14,6 +14,7 @@
import static org.infinispan.commons.dataconversion.MediaType.TEXT_PLAIN;
import static org.infinispan.commons.dataconversion.MediaType.TEXT_PLAIN_TYPE;
import static org.infinispan.commons.test.CommonsTestingUtil.tmpDirectory;
import static org.infinispan.commons.util.EnumUtil.EMPTY_BIT_SET;
import static org.infinispan.commons.util.Util.getResourceAsString;
import static org.infinispan.configuration.cache.IndexStorage.LOCAL_HEAP;
import static org.infinispan.context.Flag.SKIP_CACHE_LOAD;
Expand Down Expand Up @@ -53,7 +54,6 @@

import javax.xml.parsers.DocumentBuilderFactory;

import org.assertj.core.api.Assertions;
import org.infinispan.Cache;
import org.infinispan.client.rest.RestCacheClient;
import org.infinispan.client.rest.RestClient;
Expand All @@ -67,12 +67,14 @@
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.dataconversion.internal.Json;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.configuration.parsing.ParserRegistry;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.globalstate.ConfigurationStorage;
import org.infinispan.globalstate.GlobalConfigurationManager;
import org.infinispan.globalstate.ScopedState;
Expand All @@ -81,11 +83,19 @@
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.rest.ResponseHeader;
import org.infinispan.rest.assertion.ResponseAssertion;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.LocalTopologyManager;

import io.reactivex.rxjava3.core.Flowable;
import okhttp3.internal.http2.StreamResetException;
import org.assertj.core.api.Assertions;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import org.testng.reporters.Files;
import org.w3c.dom.Document;
Expand Down Expand Up @@ -824,6 +834,58 @@ private void createAndWriteToCache(String name, Configuration configuration) {
assertThat(response).isOk();
}

@Test
public void testKeyStreamWithFailure() {
String exceptionMessage = "Expected failure";
Cache<?, ?> c = cacheManagers.get(0).getCache("default");
ComponentRegistry ccr = TestingUtil.extractComponentRegistry(c);
ClusterPublisherManager<?, ?> cpm = ccr.getClusterPublisherManager().running();
ClusterPublisherManager<?, ?> spyCpm = Mockito.spy(cpm);
Mockito.doAnswer(ivk -> {
SegmentPublisherSupplier<Object> sps = (SegmentPublisherSupplier<Object>) ivk.callRealMethod();
SegmentPublisherSupplier<Object> spySps = Mockito.spy(sps);
Mockito.doAnswer(ignore -> Flowable.error(new RuntimeException(exceptionMessage)))
.when(spySps).publisherWithoutSegments();
return spySps;
}).when(spyCpm).keyPublisher(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(EMPTY_BIT_SET),
Mockito.eq(DeliveryGuarantee.EXACTLY_ONCE), Mockito.eq(1000), Mockito.any());
TestingUtil.replaceComponent(c, ClusterPublisherManager.class, spyCpm, true);

if (protocol == HTTP_11) {
Exceptions.expectCompletionException(IOException.class, "unexpected end of stream on .*", client.cache("default").keys());
} else {
// OkHttp wraps the exception in a StreamResetException.
Exceptions.expectCompletionException(StreamResetException.class, "stream was reset: .*", client.cache("default").keys());
}
TestingUtil.replaceComponent(c, ClusterPublisherManager.class, cpm, true);
}

@Test
public void testEntryStreamWithFailure() {
String exceptionMessage = "Expected failure";
Cache<?, ?> c = cacheManagers.get(0).getCache("default");
ComponentRegistry ccr = TestingUtil.extractComponentRegistry(c);
ClusterPublisherManager<?, ?> cpm = ccr.getClusterPublisherManager().running();
ClusterPublisherManager<?, ?> spyCpm = Mockito.spy(cpm);
Mockito.doAnswer(ivk -> {
SegmentPublisherSupplier<Object> sps = (SegmentPublisherSupplier<Object>) ivk.callRealMethod();
SegmentPublisherSupplier<Object> spySps = Mockito.spy(sps);
Mockito.doAnswer(ignore -> Flowable.error(new RuntimeException(exceptionMessage)))
.when(spySps).publisherWithoutSegments();
return spySps;
}).when(spyCpm).entryPublisher(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(EMPTY_BIT_SET),
Mockito.eq(DeliveryGuarantee.EXACTLY_ONCE), Mockito.eq(1000), Mockito.any());
TestingUtil.replaceComponent(c, ClusterPublisherManager.class, spyCpm, true);

if (protocol == HTTP_11) {
Exceptions.expectCompletionException(IOException.class, "unexpected end of stream on .*", client.cache("default").entries());
} else {
// OkHttp wraps the exception in a StreamResetException.
Exceptions.expectCompletionException(StreamResetException.class, "stream was reset: .*", client.cache("default").entries());
}
TestingUtil.replaceComponent(c, ClusterPublisherManager.class, cpm, true);
}

@Test
public void testMultiByte() {
putTextEntryInCache("default", "José", "Uberlândia");
Expand Down

0 comments on commit a28cef4

Please sign in to comment.