Skip to content

Commit

Permalink
ISPN-14907 Implement RESP "cluster shards" command
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina committed May 25, 2023
1 parent c11bc1c commit 0f671ea
Show file tree
Hide file tree
Showing 14 changed files with 608 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,8 @@ public interface ProtoStreamTypeIds {
int CACHE_DISTRIBUTION_INFO = DATA_DISTRIBUTION_LOWER_BOUND;
int CLUSTER_DISTRIBUTION_INFO = DATA_DISTRIBUTION_LOWER_BOUND + 1;
int KEY_DISTRIBUTION_INFO = DATA_DISTRIBUTION_LOWER_BOUND + 2;

// RESP usage 6100 -> 6199
int RESP_LOWER_BOUND = 6100;
int RESP_NODE_INFORMATION = RESP_LOWER_BOUND;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The {brandname} RESP endpoint implements the following Redis commands:
| link:https://redis.io/commands/command[COMMAND]
|

| link:https://redis.io/commands/command[CLUSTER]
|

| link:https://redis.io/commands/dbsize[DBSIZE]
|

Expand Down
24 changes: 24 additions & 0 deletions server/resp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
<groupId>org.infinispan</groupId>
<artifactId>infinispan-multimap</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-processor</artifactId>
<version>${version.protostream}</version>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream</artifactId>
<version>${version.protostream}</version>
</dependency>

<dependency>
<groupId>org.kohsuke.metainf-services</groupId>
<artifactId>metainf-services</artifactId>
Expand Down Expand Up @@ -89,6 +102,17 @@
</dependenciesToScan>
</configuration>
</plugin>
<plugin>
<groupId>org.infinispan.maven-plugins</groupId>
<artifactId>proto-schema-compatibility</artifactId>
<executions>
<execution>
<goals>
<goal>proto-schema-compatibility-check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Expand Down
40 changes: 40 additions & 0 deletions server/resp/proto.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"definitions": [
{
"protopath": "generated:/:persistence.resp.proto",
"def": {
"messages": [
{
"name": "NodeInformation",
"fields": [
{
"id": 1,
"name": "name",
"type": "string"
},
{
"id": 2,
"name": "address",
"type": "string"
},
{
"id": 3,
"name": "port",
"type": "int32",
"options": [
{
"name": "default",
"value": "-1"
}
]
}
]
}
],
"package": {
"name": "org.infinispan.persistence.resp"
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Arrays;
import java.util.List;

import org.infinispan.server.resp.commands.CLUSTER;
import org.infinispan.server.resp.commands.CONFIG;
import org.infinispan.server.resp.commands.INFO;
import org.infinispan.server.resp.commands.connection.AUTH;
Expand All @@ -19,14 +20,14 @@
import org.infinispan.server.resp.commands.connection.READWRITE;
import org.infinispan.server.resp.commands.connection.RESET;
import org.infinispan.server.resp.commands.connection.SELECT;
import org.infinispan.server.resp.commands.list.LPOP;
import org.infinispan.server.resp.commands.list.LRANGE;
import org.infinispan.server.resp.commands.list.RPOP;
import org.infinispan.server.resp.commands.generic.EXISTS;
import org.infinispan.server.resp.commands.list.LINDEX;
import org.infinispan.server.resp.commands.list.LLEN;
import org.infinispan.server.resp.commands.list.LPOP;
import org.infinispan.server.resp.commands.list.LPUSH;
import org.infinispan.server.resp.commands.list.LPUSHX;
import org.infinispan.server.resp.commands.generic.EXISTS;
import org.infinispan.server.resp.commands.list.LRANGE;
import org.infinispan.server.resp.commands.list.RPOP;
import org.infinispan.server.resp.commands.list.RPUSH;
import org.infinispan.server.resp.commands.list.RPUSHX;
import org.infinispan.server.resp.commands.pubsub.PSUBSCRIBE;
Expand All @@ -46,8 +47,8 @@
import org.infinispan.server.resp.commands.string.MGET;
import org.infinispan.server.resp.commands.string.MSET;
import org.infinispan.server.resp.commands.string.SET;
import org.infinispan.server.resp.commands.string.STRLEN;
import org.infinispan.server.resp.commands.string.STRALGO;
import org.infinispan.server.resp.commands.string.STRLEN;

import io.netty.buffer.ByteBuf;

Expand Down Expand Up @@ -100,7 +101,7 @@ public String getName() {
// NOTE that the order within the sub array matters, commands we want to have the lowest latency should be first
// in this array as they are looked up sequentially for matches
indexedRespCommand[0] = new RespCommand[]{new APPEND(), new AUTH()};
indexedRespCommand[2] = new RespCommand[]{new CONFIG(), new COMMAND()};
indexedRespCommand[2] = new RespCommand[]{new CONFIG(), new COMMAND(), new CLUSTER()};
// DEL should always be first here
indexedRespCommand[3] = new RespCommand[]{new DEL(), new DECR(), new DECRBY(), new DBSIZE()};
indexedRespCommand[4] = new RespCommand[]{new ECHO(), new EXISTS()};
Expand Down Expand Up @@ -136,27 +137,35 @@ public static RespCommand fromByteBuf(ByteBuf buf, int commandLength) {
return null;
}
for (RespCommand possible : target) {
byte[] possibleBytes = possible.bytes;
if (commandLength == possibleBytes.length) {
boolean matches = true;
// Already checked first byte, so skip that one
for (int i = 1; i < possibleBytes.length; ++i) {
byte upperByte = possibleBytes[i];
byte targetByte = buf.getByte(readOffset + i);
if (upperByte == targetByte || upperByte + 32 == targetByte) {
continue;
}
matches = false;
break;
}
if (matches) {
return possible;
}
if (possible.match(buf::getByte, commandLength, readOffset)) {
return possible;
}
}
return null;
}

public final boolean match(byte[] other) {
return match(i -> other[i], other.length, 0);
}

private boolean match(Readable readable, int length, int offset) {
byte[] possibleBytes = bytes;
if (length == possibleBytes.length) {
boolean matches = true;
for (int i = 0; i < possibleBytes.length; ++i) {
byte upperByte = possibleBytes[i];
byte targetByte = readable.read(offset + i);
if (upperByte == targetByte || upperByte + 32 == targetByte) {
continue;
}
matches = false;
break;
}
return matches;
}
return false;
}

public int getArity() {
return arity;
}
Expand All @@ -172,4 +181,12 @@ public int getLastKeyPos() {
public int getSteps() {
return steps;
}

public String describeSubCommands() {
return "*0\r\n";
}

interface Readable {
byte read(int index);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.infinispan.server.resp.cluster;

import static org.infinispan.commons.marshall.ProtoStreamTypeIds.RESP_NODE_INFORMATION;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;

@ProtoTypeId(RESP_NODE_INFORMATION)
public class NodeInformation {

private final String name;
private final String address;
private final int port;

@ProtoFactory
public NodeInformation(String name, String address, int port) {
this.name = name;
this.address = address;
this.port = port;
}

@ProtoField(1)
public String name() {
return name;
}

@ProtoField(2)
public String address() {
return address;
}

@ProtoField(value = 3, defaultValue = "-1")
public int port() {
return port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.infinispan.server.resp.commands;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespErrorUtil;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.commands.cluster.SHARDS;

import io.netty.channel.ChannelHandlerContext;

public class CLUSTER extends FamilyCommand {

private static final RespCommand[] CLUSTER_COMMANDS;

static {
CLUSTER_COMMANDS = new RespCommand[] {
new SHARDS()
};
}

public CLUSTER() {
super(-2, 0, 0, 0);
}

@Override
public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
if (arguments.isEmpty()) {
RespErrorUtil.wrongArgumentNumber(this, handler.allocatorToUse());
return handler.myStage();
}
return super.perform(handler, ctx, arguments);
}

@Override
public RespCommand[] getFamilyCommands() {
return CLUSTER_COMMANDS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.infinispan.server.resp.commands;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.infinispan.server.resp.ByteBufferUtils;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespRequestHandler;

import io.netty.channel.ChannelHandlerContext;

/**
* An umbrella command.
* <p>
* This command represents a family of commands. Usually, the ones with multiple keywords, for example,
* {@link CLUSTER} or ACL.
*
* @since 15.0
*/
public abstract class FamilyCommand extends RespCommand implements Resp3Command {

public FamilyCommand(int arity, int firstKeyPos, int lastKeyPos, int steps) {
super(arity, firstKeyPos, lastKeyPos, steps);
}

@Override
public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
byte[] subCommand = arguments.get(0);
for (RespCommand cmd : getFamilyCommands()) {
if (cmd.match(subCommand)) {
if (cmd instanceof Resp3Command) {
return ((Resp3Command) cmd).perform(handler, ctx, arguments);
}

if (cmd instanceof AuthResp3Command) {
return ((AuthResp3Command) cmd).perform(handler, ctx, arguments);
}

break;
}
}
ByteBufferUtils.stringToByteBuf("-ERR unknown command\r\n", handler.allocatorToUse());
return handler.myStage();
}

public abstract RespCommand[] getFamilyCommands();

public String describeSubCommands() {
// TODO: override to properly describe the commands.
return super.describeSubCommands();
}
}

0 comments on commit 0f671ea

Please sign in to comment.