Skip to content

Commit

Permalink
ISPN-14907 Implement RESP "cluster shards" command
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina committed May 31, 2023
1 parent 3ec031e commit 50b40a0
Show file tree
Hide file tree
Showing 8 changed files with 458 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The {brandname} RESP endpoint implements the following Redis commands:
| link:https://redis.io/commands/command[COMMAND]
|

| link:https://redis.io/commands/command[CLUSTER]
|

| link:https://redis.io/commands/dbsize[DBSIZE]
|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.infinispan.server.resp.commands.CONFIG;
import org.infinispan.server.resp.commands.INFO;
import org.infinispan.server.resp.commands.cluster.CLUSTER;
import org.infinispan.server.resp.commands.connection.AUTH;
import org.infinispan.server.resp.commands.connection.COMMAND;
import org.infinispan.server.resp.commands.connection.DBSIZE;
Expand All @@ -19,16 +20,16 @@
import org.infinispan.server.resp.commands.connection.READWRITE;
import org.infinispan.server.resp.commands.connection.RESET;
import org.infinispan.server.resp.commands.connection.SELECT;
import org.infinispan.server.resp.commands.list.LPOP;
import org.infinispan.server.resp.commands.list.LPOS;
import org.infinispan.server.resp.commands.list.LSET;
import org.infinispan.server.resp.commands.list.LRANGE;
import org.infinispan.server.resp.commands.list.RPOP;
import org.infinispan.server.resp.commands.generic.EXISTS;
import org.infinispan.server.resp.commands.list.LINDEX;
import org.infinispan.server.resp.commands.list.LLEN;
import org.infinispan.server.resp.commands.list.LPOP;
import org.infinispan.server.resp.commands.list.LPOS;
import org.infinispan.server.resp.commands.list.LPUSH;
import org.infinispan.server.resp.commands.list.LPUSHX;
import org.infinispan.server.resp.commands.generic.EXISTS;
import org.infinispan.server.resp.commands.list.LRANGE;
import org.infinispan.server.resp.commands.list.LSET;
import org.infinispan.server.resp.commands.list.RPOP;
import org.infinispan.server.resp.commands.list.RPUSH;
import org.infinispan.server.resp.commands.list.RPUSHX;
import org.infinispan.server.resp.commands.pubsub.PSUBSCRIBE;
Expand All @@ -48,10 +49,11 @@
import org.infinispan.server.resp.commands.string.MGET;
import org.infinispan.server.resp.commands.string.MSET;
import org.infinispan.server.resp.commands.string.SET;
import org.infinispan.server.resp.commands.string.STRLEN;
import org.infinispan.server.resp.commands.string.STRALGO;
import org.infinispan.server.resp.commands.string.STRLEN;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public abstract class RespCommand {
private final String name;
Expand Down Expand Up @@ -102,7 +104,7 @@ public String getName() {
// NOTE that the order within the sub array matters, commands we want to have the lowest latency should be first
// in this array as they are looked up sequentially for matches
indexedRespCommand[0] = new RespCommand[]{new APPEND(), new AUTH()};
indexedRespCommand[2] = new RespCommand[]{new CONFIG(), new COMMAND()};
indexedRespCommand[2] = new RespCommand[]{new CONFIG(), new COMMAND(), new CLUSTER()};
// DEL should always be first here
indexedRespCommand[3] = new RespCommand[]{new DEL(), new DECR(), new DECRBY(), new DBSIZE()};
indexedRespCommand[4] = new RespCommand[]{new ECHO(), new EXISTS()};
Expand Down Expand Up @@ -138,27 +140,35 @@ public static RespCommand fromByteBuf(ByteBuf buf, int commandLength) {
return null;
}
for (RespCommand possible : target) {
byte[] possibleBytes = possible.bytes;
if (commandLength == possibleBytes.length) {
boolean matches = true;
// Already checked first byte, so skip that one
for (int i = 1; i < possibleBytes.length; ++i) {
byte upperByte = possibleBytes[i];
byte targetByte = buf.getByte(readOffset + i);
if (upperByte == targetByte || upperByte + 32 == targetByte) {
continue;
}
matches = false;
break;
}
if (matches) {
return possible;
}
if (possible.match(buf, commandLength, readOffset)) {
return possible;
}
}
return null;
}

public final boolean match(byte[] other) {
return match(Unpooled.wrappedBuffer(other), other.length, 0);
}

private boolean match(ByteBuf buf, int length, int offset) {
byte[] possibleBytes = bytes;
if (length == possibleBytes.length) {
boolean matches = true;
for (int i = 0; i < possibleBytes.length; ++i) {
byte upperByte = possibleBytes[i];
byte targetByte = buf.getByte(offset + i);
if (upperByte == targetByte || upperByte + 32 == targetByte) {
continue;
}
matches = false;
break;
}
return matches;
}
return false;
}

public int getArity() {
return arity;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.infinispan.server.resp.commands;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.infinispan.server.resp.ByteBufferUtils;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.commands.cluster.CLUSTER;

import io.netty.channel.ChannelHandlerContext;

/**
* An umbrella command.
* <p>
* This command represents a family of commands. Usually, the ones with multiple keywords, for example,
* {@link CLUSTER} or ACL.
*
* @since 15.0
*/
public abstract class FamilyCommand extends RespCommand implements Resp3Command {

public FamilyCommand(int arity, int firstKeyPos, int lastKeyPos, int steps) {
super(arity, firstKeyPos, lastKeyPos, steps);
}

@Override
public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
byte[] subCommand = arguments.get(0);
for (RespCommand cmd : getFamilyCommands()) {
if (cmd.match(subCommand)) {
if (cmd instanceof Resp3Command) {
return ((Resp3Command) cmd).perform(handler, ctx, arguments);
}

if (cmd instanceof AuthResp3Command) {
return ((AuthResp3Command) cmd).perform(handler, ctx, arguments);
}

break;
}
}
ByteBufferUtils.stringToByteBuf("-ERR unknown command\r\n", handler.allocatorToUse());
return handler.myStage();
}

public abstract RespCommand[] getFamilyCommands();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.infinispan.server.resp.commands.cluster;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespErrorUtil;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.commands.FamilyCommand;

import io.netty.channel.ChannelHandlerContext;

public class CLUSTER extends FamilyCommand {

private static final RespCommand[] CLUSTER_COMMANDS;

static {
CLUSTER_COMMANDS = new RespCommand[] {
new SHARDS()
};
}

public CLUSTER() {
super(-2, 0, 0, 0);
}

@Override
public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
if (arguments.isEmpty()) {
RespErrorUtil.wrongArgumentNumber(this, handler.allocatorToUse());
return handler.myStage();
}
return super.perform(handler, ctx, arguments);
}

@Override
public RespCommand[] getFamilyCommands() {
return CLUSTER_COMMANDS;
}
}

0 comments on commit 50b40a0

Please sign in to comment.