Skip to content

Commit

Permalink
ISPN-14571 Propagate errors in REST chunked response
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina committed Mar 9, 2023
1 parent 0af8f6f commit b525da5
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 0 deletions.
Expand Up @@ -32,6 +32,8 @@ public interface RestResponse extends RestEntity, AutoCloseable {

Map<String, List<String>> headers();

Map<String, List<String>> trailers();

/**
* Returns the value of a header as a String. For multi-valued headers, values are separated by comma.
*/
Expand Down
Expand Up @@ -32,6 +32,15 @@ public Map<String, List<String>> headers() {
return response.headers().toMultimap();
}

@Override
public Map<String, List<String>> trailers() {
try {
return response.trailers().toMultimap();
} catch (IOException e) {
throw new RuntimeException("Trailers are not ready", e);
}
}

@Override
public String getHeader(String header) {
List<String> values = response.headers(header);
Expand Down
Expand Up @@ -81,6 +81,7 @@ void writeResponse(ChannelHandlerContext ctx, FullHttpRequest request, NettyRest
void writeResponse(ChannelHandlerContext ctx, FullHttpRequest request, NettyRestResponse response) {
HttpResponse res = response.getResponse();
res.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
res.headers().set(HttpHeaderNames.TRAILER, CacheChunkedStream.TRAILER_ERROR_HEADER);
res.headers().set(CONNECTION, KEEP_ALIVE);
log(ctx, request, res);
ctx.write(res);
Expand Down
Expand Up @@ -6,6 +6,11 @@
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import org.reactivestreams.Publisher;

import io.netty.buffer.ByteBuf;
Expand All @@ -19,6 +24,7 @@
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;

public abstract class CacheChunkedStream<T> {
public static final String TRAILER_ERROR_HEADER = "failure";
protected static final Log logger = LogFactory.getLog(CacheChunkedStream.class);

// Netty default value with `ChunkedStream`.
Expand Down Expand Up @@ -114,6 +120,7 @@ public void onError(Throwable t) {
pendingBuffer = null;
}
cancel();
writeToContext(t);
}

@Override
Expand All @@ -133,5 +140,11 @@ ChannelFuture writeToContext(ByteBuf buf, boolean isComplete) {
ctx.flush();
return completeFuture;
}

void writeToContext(Throwable t) {
HttpHeaders trailer = new DefaultHttpHeaders();
trailer.add(TRAILER_ERROR_HEADER, t.getMessage());
ctx.writeAndFlush(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, trailer));
}
}
}
Expand Up @@ -14,6 +14,7 @@
import static org.infinispan.commons.dataconversion.MediaType.TEXT_PLAIN;
import static org.infinispan.commons.dataconversion.MediaType.TEXT_PLAIN_TYPE;
import static org.infinispan.commons.test.CommonsTestingUtil.tmpDirectory;
import static org.infinispan.commons.util.EnumUtil.EMPTY_BIT_SET;
import static org.infinispan.commons.util.Util.getResourceAsString;
import static org.infinispan.configuration.cache.IndexStorage.LOCAL_HEAP;
import static org.infinispan.context.Flag.SKIP_CACHE_LOAD;
Expand Down Expand Up @@ -53,6 +54,7 @@

import javax.xml.parsers.DocumentBuilderFactory;

import io.reactivex.rxjava3.core.Flowable;
import org.assertj.core.api.Assertions;
import org.infinispan.Cache;
import org.infinispan.client.rest.RestCacheClient;
Expand All @@ -73,6 +75,7 @@
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.configuration.parsing.ParserRegistry;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.globalstate.ConfigurationStorage;
import org.infinispan.globalstate.GlobalConfigurationManager;
import org.infinispan.globalstate.ScopedState;
Expand All @@ -81,11 +84,17 @@
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.rest.ResponseHeader;
import org.infinispan.rest.assertion.ResponseAssertion;
import org.infinispan.rest.stream.CacheChunkedStream;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.LocalTopologyManager;

import org.mockito.Mockito;
import org.testng.annotations.Test;
import org.testng.reporters.Files;
import org.w3c.dom.Document;
Expand Down Expand Up @@ -742,6 +751,56 @@ private void createAndWriteToCache(String name, Configuration configuration) {
assertThat(response).isOk();
}

@Test
public void testKeyStreamWithFailure() {
String exceptionMessage = "Expected failure";
Cache<?, ?> c = cacheManagers.get(0).getCache("default");
ComponentRegistry ccr = TestingUtil.extractComponentRegistry(c);
ClusterPublisherManager<?, ?> cpm = ccr.getClusterPublisherManager().running();
ClusterPublisherManager<?, ?> spyCpm = Mockito.spy(cpm);
Mockito.doAnswer(ivk -> {
SegmentPublisherSupplier<Object> sps = (SegmentPublisherSupplier<Object>) ivk.callRealMethod();
SegmentPublisherSupplier<Object> spySps = Mockito.spy(sps);
Mockito.doAnswer(ignore -> Flowable.error(new RuntimeException(exceptionMessage)))
.when(spySps).publisherWithoutSegments();
return spySps;
}).when(spyCpm).keyPublisher(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(EMPTY_BIT_SET),
Mockito.eq(DeliveryGuarantee.EXACTLY_ONCE), Mockito.eq(1000), Mockito.any());
TestingUtil.replaceComponent(c, ClusterPublisherManager.class, spyCpm, true);

RestResponse response = join(client.cache("default").keys());
assertThat(response).isOk();
// It is empty because we don't have any values to return.
assertThat(response.getBody()).isEmpty();
assertThat(response.trailers()).containsEntry(CacheChunkedStream.TRAILER_ERROR_HEADER, List.of(exceptionMessage));
TestingUtil.replaceComponent(c, ClusterPublisherManager.class, cpm, true);
}

@Test
public void testEntryStreamWithFailure() {
String exceptionMessage = "Expected failure";
Cache<?, ?> c = cacheManagers.get(0).getCache("default");
ComponentRegistry ccr = TestingUtil.extractComponentRegistry(c);
ClusterPublisherManager<?, ?> cpm = ccr.getClusterPublisherManager().running();
ClusterPublisherManager<?, ?> spyCpm = Mockito.spy(cpm);
Mockito.doAnswer(ivk -> {
SegmentPublisherSupplier<Object> sps = (SegmentPublisherSupplier<Object>) ivk.callRealMethod();
SegmentPublisherSupplier<Object> spySps = Mockito.spy(sps);
Mockito.doAnswer(ignore -> Flowable.error(new RuntimeException(exceptionMessage)))
.when(spySps).publisherWithoutSegments();
return spySps;
}).when(spyCpm).entryPublisher(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(EMPTY_BIT_SET),
Mockito.eq(DeliveryGuarantee.EXACTLY_ONCE), Mockito.eq(1000), Mockito.any());
TestingUtil.replaceComponent(c, ClusterPublisherManager.class, spyCpm, true);

RestResponse response = join(client.cache("default").entries());
assertThat(response).isOk();
// It is empty because we don't have any values to return.
assertThat(response.getBody()).isEmpty();
assertThat(response.trailers()).containsEntry(CacheChunkedStream.TRAILER_ERROR_HEADER, List.of(exceptionMessage));
TestingUtil.replaceComponent(c, ClusterPublisherManager.class, cpm, true);
}

@Test
public void testMultiByte() {
putTextEntryInCache("default", "José", "Uberlândia");
Expand Down

0 comments on commit b525da5

Please sign in to comment.