Skip to content

Commit

Permalink
ISPN-14907 Implement RESP "cluster shards" command
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina committed Jun 2, 2023
1 parent 515fd48 commit 475062d
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The {brandname} RESP endpoint implements the following Redis commands:
| link:https://redis.io/commands/command[COMMAND]
|

| link:https://redis.io/commands/command[CLUSTER]
|

| link:https://redis.io/commands/dbsize[DBSIZE]
|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.infinispan.util.logging.LogFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public abstract class RespCommand {
protected final static Log log = LogFactory.getLog(RespCommand.class, Log.class);
Expand Down Expand Up @@ -80,28 +81,36 @@ public static RespCommand fromByteBuf(ByteBuf buf, int commandLength) {
return null;
}
for (RespCommand possible : target) {
byte[] possibleBytes = possible.bytes;
if (commandLength == possibleBytes.length) {
boolean matches = true;
// Already checked first byte, so skip that one
for (int i = 1; i < possibleBytes.length; ++i) {
byte upperByte = possibleBytes[i];
byte targetByte = buf.getByte(readOffset + i);
if (upperByte == targetByte || upperByte + 32 == targetByte) {
continue;
}
matches = false;
break;
}
if (matches) {
return possible;
}
if (possible.match(buf, commandLength, readOffset)) {
return possible;
}
}
log.tracef("Unknown command %s", buf.getCharSequence(readOffset, commandLength, StandardCharsets.US_ASCII));
return null;
}

public final boolean match(byte[] other) {
return match(Unpooled.wrappedBuffer(other), other.length, 0);
}

private boolean match(ByteBuf buf, int length, int offset) {
byte[] possibleBytes = bytes;
if (length == possibleBytes.length) {
boolean matches = true;
for (int i = 0; i < possibleBytes.length; ++i) {
byte upperByte = possibleBytes[i];
byte targetByte = buf.getByte(offset + i);
if (upperByte == targetByte || upperByte + 32 == targetByte) {
continue;
}
matches = false;
break;
}
return matches;
}
log.tracef("Unknown command %s", buf.getCharSequence(offset, length, StandardCharsets.US_ASCII));
return false;
}

public int getArity() {
return arity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;

import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.commands.cluster.CLUSTER;
import org.infinispan.server.resp.commands.connection.AUTH;
import org.infinispan.server.resp.commands.connection.COMMAND;
import org.infinispan.server.resp.commands.connection.DBSIZE;
Expand Down Expand Up @@ -65,7 +66,7 @@ public final class Commands {
// NOTE that the order within the sub array matters, commands we want to have the lowest latency should be first
// in this array as they are looked up sequentially for matches
ALL_COMMANDS[0] = new RespCommand[]{new APPEND(), new AUTH()};
ALL_COMMANDS[2] = new RespCommand[]{new CONFIG(), new COMMAND()};
ALL_COMMANDS[2] = new RespCommand[]{new CONFIG(), new COMMAND(), new CLUSTER()};
// DEL should always be first here
ALL_COMMANDS[3] = new RespCommand[]{new DEL(), new DECR(), new DECRBY(), new DBSIZE()};
ALL_COMMANDS[4] = new RespCommand[]{new ECHO(), new EXISTS()};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.infinispan.server.resp.commands;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.infinispan.server.resp.ByteBufferUtils;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespErrorUtil;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.commands.cluster.CLUSTER;

import io.netty.channel.ChannelHandlerContext;

/**
* An umbrella command.
* <p>
* This command represents a family of commands. Usually, the ones with multiple keywords, for example,
* {@link CLUSTER} or ACL.
*
* @since 15.0
*/
public abstract class FamilyCommand extends RespCommand implements Resp3Command {

public FamilyCommand(int arity, int firstKeyPos, int lastKeyPos, int steps) {
super(arity, firstKeyPos, lastKeyPos, steps);
}

@Override
public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
if (arguments.isEmpty()) {
RespErrorUtil.wrongArgumentNumber(this, handler.allocator());
return handler.myStage();
}

byte[] subCommand = arguments.get(0);
for (RespCommand cmd : getFamilyCommands()) {
if (cmd.match(subCommand)) {
if (cmd instanceof Resp3Command) {
return ((Resp3Command) cmd).perform(handler, ctx, arguments);
}

if (cmd instanceof AuthResp3Command) {
return ((AuthResp3Command) cmd).perform(handler, ctx, arguments);
}

break;
}
}
ByteBufferUtils.stringToByteBuf("-ERR unknown command\r\n", handler.allocator());
return handler.myStage();
}

public abstract RespCommand[] getFamilyCommands();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.infinispan.server.resp.commands.cluster;

import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.commands.FamilyCommand;

public class CLUSTER extends FamilyCommand {

private static final RespCommand[] CLUSTER_COMMANDS;

static {
CLUSTER_COMMANDS = new RespCommand[] {
new SHARDS()
};
}

public CLUSTER() {
super(-2, 0, 0, 0);
}

@Override
public RespCommand[] getFamilyCommands() {
return CLUSTER_COMMANDS;
}
}

0 comments on commit 475062d

Please sign in to comment.