Skip to content

Commit

Permalink
ISPN-15319 RESP cmd hangs with dist cache
Browse files Browse the repository at this point in the history
* Update tests with two nodes to cover more commands.
* Add assertions to the ByteBuf allocator.
* Update multiple commands to complete and write only in the event loop.

Co-authored-by: José Bolina <jbolina@redhat.com>
  • Loading branch information
2 people authored and wburns committed Nov 30, 2023
1 parent 1eae58d commit 33de5fe
Show file tree
Hide file tree
Showing 24 changed files with 595 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.infinispan.AdvancedCache;
Expand Down Expand Up @@ -826,6 +828,16 @@ protected MagicKey getKeyForCache(Cache<?, ?> cache) {
return new MagicKey(cache);
}

protected String getStringKeyForCache(Cache<?, ?> primary) {
return getStringKeyForCache("key", primary);
}

protected String getStringKeyForCache(String prefix, Cache<?, ?> primary) {
LocalizedCacheTopology topology = primary.getAdvancedCache().getDistributionManager().getCacheTopology();
return IntStream.generate(ThreadLocalRandom.current()::nextInt).mapToObj(i -> prefix + i)
.filter(key -> topology.getDistribution(key).isPrimary()).findAny().orElseThrow();
}

protected MagicKey getKeyForCache(Cache<?, ?> primary, Cache<?, ?>... backup) {
return new MagicKey(primary, backup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private List<Object> getYoungerThan(Cache<?, ?> cache, int age) {
return cache.query(q).execute().list();
}

private String getStringKeyForCache(Cache cache) {
protected String getStringKeyForCache(Cache cache) {
LocalizedCacheTopology topology = cache.getAdvancedCache().getDistributionManager().getCacheTopology();
return IntStream.generate(ThreadLocalRandom.current()::nextInt).mapToObj(i -> "key" + i)
.filter(key -> topology.getDistribution(key).isPrimary()).findAny().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ private Consumers() {
}
};

public static final BiConsumer<Object, ByteBufPool> LONG_ELSE_COLLECTION = (res, alloc) -> {
if (res instanceof Long) Consumers.LONG_BICONSUMER.accept((Long) res, alloc);
else Consumers.GET_ARRAY_BICONSUMER.accept((Collection<byte[]>) res, alloc);
};

public static final BiConsumer<Collection<ScoredValue<byte[]>>, ByteBufPool> GET_OBJ_WRAPPER_ARRAY_BICONSUMER = (innerValueBytes, alloc) -> {
if (innerValueBytes != null) {
ByteBufferUtils.bytesToResultWrapped(innerValueBytes, alloc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public RespHandler(BaseRespDecoder resumeHandler, RespRequestHandler requestHand
}

protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, int size) {
assert ctx.channel().eventLoop().inEventLoop() : "Buffer allocation should occur in event loop, it was " + Thread.currentThread().getName();
if (traceAccess) accessLogger.accept(size);
if (outboundBuffer != null) {
if (outboundBuffer.writableBytes() > size) {
Expand Down Expand Up @@ -163,7 +164,7 @@ protected void handleCommandAndArguments(ChannelHandlerContext ctx, RespCommand
// Disable reading any more from socket - until command is complete
ctx.channel().config().setAutoRead(false);
stage.whenComplete((handler, t) -> {
assert ctx.channel().eventLoop().inEventLoop();
assert ctx.channel().eventLoop().inEventLoop() : "Command should complete only in event loop thread, it was " + Thread.currentThread().getName();
if (t != null) {
exceptionCaught(ctx, t);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.infinispan.server.resp.commands.connection.QUIT;
Expand All @@ -19,8 +20,17 @@ public abstract class RespRequestHandler {
protected final RespServer respServer;

public static final AttributeKey<ByteBufPool> BYTE_BUF_POOL_ATTRIBUTE_KEY = AttributeKey.newInstance("buffer-pool");
private final BiFunction<RespRequestHandler, Throwable, RespRequestHandler> HANDLE_FAILURE = (h, t) -> {
if (t != null) {
Resp3Handler.handleThrowable(h.allocator(), t);
// If exception, never use handler
return this;
}
return h;
};

ByteBufPool allocatorToUse;
ChannelHandlerContext ctx;

protected RespRequestHandler(RespServer server) {
this.respServer = server;
Expand All @@ -32,6 +42,7 @@ protected void initializeIfNecessary(ChannelHandlerContext ctx) {
throw new IllegalStateException("BufferPool was not initialized in the context " + ctx);
}
allocatorToUse = ctx.channel().attr(BYTE_BUF_POOL_ATTRIBUTE_KEY).get();
this.ctx = ctx;
}
}

Expand All @@ -43,7 +54,19 @@ public CompletionStage<RespRequestHandler> myStage() {
return myStage;
}

/**
* Acquire the buffer allocator in the current context.
* <p>
* The {@link ByteBufPool} provides the means to allocate a {@link io.netty.buffer.ByteBuf} with the required
* free size. <b>All</b> the writes must utilize the {@link ByteBufPool} instead of manually allocating buffers.
* </p>
* <b>Thread-safety</b>: The allocator and the {@link io.netty.buffer.ByteBuf} must be utilized only from the
* event loop thread. Use the {@link ChannelHandlerContext} to verify.
*
* @return A {@link io.netty.buffer.ByteBuf} allocator.
*/
public ByteBufPool allocator() {
assert ctx.channel().eventLoop().inEventLoop() : "Buffer allocation should occur in event loop, it was " + Thread.currentThread().getName();
return allocatorToUse;
}

Expand Down Expand Up @@ -100,6 +123,15 @@ public <E> CompletionStage<RespRequestHandler> stageToReturn(CompletionStage<E>
return stageToReturn(stage, ctx, null, Objects.requireNonNull(handlerWhenComplete));
}

public CompletionStage<RespRequestHandler> stageToReturn(CompletionStage<RespRequestHandler> stage, ChannelHandlerContext ctx) {
assert ctx.channel().eventLoop().inEventLoop();
if (CompletionStages.isCompletedSuccessfully(stage)) {
RespRequestHandler rrh = CompletionStages.join(stage);
return rrh.myStage();
}
return stage.handleAsync(HANDLE_FAILURE, ctx.channel().eventLoop());
}

/**
* Handles ensuring that a stage TriConsumer is only invoked on the event loop. The TriConsumer can then do things
* such as writing to the underlying channel or update variables in a thread safe manner without additional
Expand Down Expand Up @@ -160,6 +192,7 @@ private <E> CompletionStage<RespRequestHandler> stageToReturn(CompletionStage<E>
}
return stage.handleAsync((value, t) -> {
if (t != null) {
Resp3Handler.handleThrowable(allocatorToUse, t);
// If exception, never use handler
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.infinispan.AdvancedCache;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.server.resp.ByteBufPool;
import org.infinispan.server.resp.Consumers;
import org.infinispan.server.resp.Resp3Handler;
Expand Down Expand Up @@ -44,38 +45,39 @@ public CompletionStage<RespRequestHandler> perform(Resp3Handler handler,

public static CompletionStage<RespRequestHandler> rename(Resp3Handler handler, byte[] srcKey, byte[] dstKey,
ChannelHandlerContext ctx, BiConsumer<?, ByteBufPool> consumer) {
BiConsumer<Object, ByteBufPool> bc = (BiConsumer<Object, ByteBufPool>) consumer;
MediaType vmt = handler.cache().getValueDataConversion().getStorageMediaType();
final AdvancedCache<byte[], Object> acm = handler.cache()
.<byte[], Object>withMediaType(MediaType.APPLICATION_OCTET_STREAM, vmt);
return acm.removeAsyncEntry(srcKey).thenCompose(e -> {
if (e == null) {
RespErrorUtil.noSuchKey(handler.allocator());
return handler.myStage();
}
CompletableFuture<Long> myStage;
if (Arrays.equals(srcKey, dstKey)) {
// If src = dest ...
myStage = CompletableFuture.completedFuture(1L);
} else {
var timeService = handler.respServer().getTimeService();
// ... else if Immortal entry case: copy metadata...
if (e.getLifespan() <= 0) {
var newMeta = e.getMetadata().builder();
myStage = acm.putAsyncEntry(dstKey, e.getValue(), newMeta.build()).thenApply(ignore -> 1L);
} else {
long newLifespan = e.getLifespan() + e.getCreated() - timeService.wallClockTime();
if (newLifespan > 0) {
// ... else if not expired copy metadata and preserve lifespan...
var newMeta = e.getMetadata().builder().lifespan(newLifespan);
myStage = acm.putAsyncEntry(dstKey, e.getValue(), newMeta.build()).thenApply(ignore -> 1L);
final AdvancedCache<byte[], Object> acm = handler.typedCache(vmt);
CompletionStage<?> cs = acm.removeAsyncEntry(srcKey)
.thenCompose(e -> {
if (e == null) return CompletableFutures.completedNull();

if (Arrays.equals(srcKey, dstKey)) {
// If src = dest ...
return CompletableFuture.completedFuture(1L);
} else {
// ... or do nothing if expired
myStage = CompletableFuture.completedFuture(1L);
var timeService = handler.respServer().getTimeService();
// ... else if Immortal entry case: copy metadata...
if (e.getLifespan() <= 0) {
var newMeta = e.getMetadata().builder();
return acm.putAsyncEntry(dstKey, e.getValue(), newMeta.build()).thenApply(ignore -> 1L);
} else {
long newLifespan = e.getLifespan() + e.getCreated() - timeService.wallClockTime();
if (newLifespan > 0) {
// ... else if not expired copy metadata and preserve lifespan...
var newMeta = e.getMetadata().builder().lifespan(newLifespan);
return acm.putAsyncEntry(dstKey, e.getValue(), newMeta.build()).thenApply(ignore -> 1L);
} else {
// ... or do nothing if expired
return CompletableFuture.completedFuture(1L);
}
}
}
}
}
return handler.stageToReturn(myStage, ctx,
(BiConsumer<Object, ByteBufPool>) consumer);
});

return handler.stageToReturn(cs, ctx, (l, bbp) -> {
if (l != null) bc.accept(l, bbp);
else RespErrorUtil.noSuchKey(handler.allocator());
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package org.infinispan.server.resp.commands.generic;

import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.infinispan.AdvancedCache;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.util.concurrent.CompletableFutures;
Expand All @@ -17,13 +24,7 @@
import org.infinispan.server.resp.commands.Resp3Command;
import org.infinispan.util.concurrent.CompletionStages;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.netty.channel.ChannelHandlerContext;

/**
* Returns or stores the elements contained in the list, set or sorted set at key.
Expand Down Expand Up @@ -224,27 +225,17 @@ private CompletionStage<RespRequestHandler> storeV(Resp3Handler handler,
byte[] destination,
CompletionStage<List<byte[]>> resultingList) {
EmbeddedMultimapListCache<byte[], byte[]> listMultimap = handler.getListMultimap();
return CompletionStages.handleAndCompose(resultingList, (values, t1) -> {
if (t1 != null) {
return handleException(handler, t1);
}
CompletionStage<Long> size = listMultimap.replace(destination, values);
return handler.stageToReturn(size, ctx, Consumers.LONG_BICONSUMER);
});
CompletionStage<Long> cs = resultingList.thenCompose(values -> listMultimap.replace(destination, values));
return handler.stageToReturn(cs, ctx, Consumers.LONG_BICONSUMER);
}

private CompletionStage<RespRequestHandler> store(Resp3Handler handler,
ChannelHandlerContext ctx,
byte[] destination,
CompletionStage<List<ScoredValue<byte[]>>> sortedList) {
EmbeddedMultimapListCache<byte[], byte[]> listMultimap = handler.getListMultimap();
return CompletionStages.handleAndCompose(sortedList, (values, t1) -> {
if (t1 != null) {
return handleException(handler, t1);
}
CompletionStage<Long> size = listMultimap.replace(destination,
values.stream().map(ScoredValue::getValue).collect(Collectors.toList()));
return handler.stageToReturn(size, ctx, Consumers.LONG_BICONSUMER);
});
CompletionStage<Long> cs = sortedList.thenCompose(values ->
listMultimap.replace(destination, values.stream().map(ScoredValue::getValue).collect(Collectors.toList())));
return handler.stageToReturn(cs, ctx, Consumers.LONG_BICONSUMER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.multimap.impl.EmbeddedMultimapPairCache;
import org.infinispan.server.resp.Consumers;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespErrorUtil;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.commands.Resp3Command;
import org.infinispan.util.concurrent.CompletionStages;

import io.netty.channel.ChannelHandlerContext;

Expand Down Expand Up @@ -41,13 +39,7 @@ public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, Channel
return handler.myStage();
}

return CompletionStages.handleAndCompose(setEntries(handler, arguments), (ignore, t) -> {
if (t != null) {
return handleException(handler, t);
}

return handler.stageToReturn(CompletableFutures.completedNull(), ctx, Consumers.OK_BICONSUMER);
});
return handler.stageToReturn(setEntries(handler, arguments), ctx, Consumers.OK_BICONSUMER);
}

protected CompletionStage<Integer> setEntries(Resp3Handler handler, List<byte[]> arguments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final CompletionStage<RespRequestHandler> perform(Resp3Handler handler, C
if (INITIAL_CURSOR.equals(cursor)) {
CompletionStage<IterationInitializationContext> initialization = initializeIteration(handler, arguments);
if (initialization != null) {
return initialization.thenCompose(iic -> initializeAndIterate(handler, ctx, manager, args, iic));
return handler.stageToReturn(initialization.thenCompose(iic -> initializeAndIterate(handler, ctx, manager, args, iic)), ctx);
}
return initializeAndIterate(handler, ctx, manager, args, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.infinispan.server.resp.commands.list;

import io.netty.channel.ChannelHandlerContext;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionStage;

import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.multimap.impl.EmbeddedMultimapListCache;
import org.infinispan.server.resp.Consumers;
Expand All @@ -11,10 +15,7 @@
import org.infinispan.server.resp.commands.Resp3Command;
import org.infinispan.server.resp.logging.Log;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionStage;
import io.netty.channel.ChannelHandlerContext;

/**
* @link https://redis.io/commands/lmove/
Expand Down Expand Up @@ -111,10 +112,9 @@ protected CompletionStage<RespRequestHandler> lmoveAndReturn(Resp3Handler handle
pollCall = listMultimap.pollLast(source, 1);
}

return pollCall.thenCompose( pollResult -> {
if (pollResult == null) {
return handler.stageToReturn(CompletableFutures.completedNull(), ctx, Consumers.GET_BICONSUMER);
}
CompletionStage<byte[]> cs = pollCall
.thenCompose(pollResult -> {
if (pollResult == null) return CompletableFutures.completedNull();

final byte[] element = pollResult.iterator().next();

Expand All @@ -125,7 +125,9 @@ protected CompletionStage<RespRequestHandler> lmoveAndReturn(Resp3Handler handle
offerCall = listMultimap.offerLast(destination, element);
}

return handler.stageToReturn(offerCall.thenApply(r -> element), ctx, Consumers.GET_BICONSUMER);
return offerCall.thenApply(r -> element);
});

return handler.stageToReturn(cs, ctx, Consumers.GET_BICONSUMER);
}
}

0 comments on commit 33de5fe

Please sign in to comment.