Skip to content

Commit

Permalink
ISPN-14229 Implement RESP "cluster" commands
Browse files Browse the repository at this point in the history
* Implements the `CLUSTER SHARDS` command.
  • Loading branch information
jabolina committed May 10, 2023
1 parent 8efdfee commit e2c76ef
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,8 @@ public interface ProtoStreamTypeIds {
int CACHE_DISTRIBUTION_INFO = DATA_DISTRIBUTION_LOWER_BOUND;
int CLUSTER_DISTRIBUTION_INFO = DATA_DISTRIBUTION_LOWER_BOUND + 1;
int KEY_DISTRIBUTION_INFO = DATA_DISTRIBUTION_LOWER_BOUND + 2;

// RESP usage 6100 -> 6199
int RESP_LOWER_BOUND = 6100;
int RESP_NODE_INFORMATION = RESP_LOWER_BOUND;
}
24 changes: 24 additions & 0 deletions server/resp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
<groupId>org.infinispan</groupId>
<artifactId>infinispan-multimap</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-processor</artifactId>
<version>${version.protostream}</version>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream</artifactId>
<version>${version.protostream}</version>
</dependency>

<dependency>
<groupId>org.kohsuke.metainf-services</groupId>
<artifactId>metainf-services</artifactId>
Expand Down Expand Up @@ -89,6 +102,17 @@
</dependenciesToScan>
</configuration>
</plugin>
<plugin>
<groupId>org.infinispan.maven-plugins</groupId>
<artifactId>proto-schema-compatibility</artifactId>
<executions>
<execution>
<goals>
<goal>proto-schema-compatibility-check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Expand Down
40 changes: 40 additions & 0 deletions server/resp/proto.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"definitions": [
{
"protopath": "generated:/:persistence.resp.proto",
"def": {
"messages": [
{
"name": "NodeInformation",
"fields": [
{
"id": 1,
"name": "name",
"type": "string"
},
{
"id": 2,
"name": "address",
"type": "string"
},
{
"id": 3,
"name": "port",
"type": "int32",
"options": [
{
"name": "default",
"value": "-1"
}
]
}
]
}
],
"package": {
"name": "org.infinispan.persistence.resp"
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import java.util.Arrays;
import java.util.List;

import org.infinispan.server.resp.commands.CLUSTER;
import org.infinispan.server.resp.commands.CONFIG;
import org.infinispan.server.resp.commands.FamilyCommand;
import org.infinispan.server.resp.commands.INFO;
import org.infinispan.server.resp.commands.connection.AUTH;
import org.infinispan.server.resp.commands.connection.COMMAND;
Expand Down Expand Up @@ -69,7 +71,14 @@ protected static List<RespCommand> all() {

for (int i = 0; i < indexedRespCommand.length; i++) {
if (indexedRespCommand[i] != null) {
respCommands.addAll(Arrays.asList(indexedRespCommand[i]));
for (RespCommand cmd : indexedRespCommand[i]) {
if (cmd instanceof FamilyCommand) {
FamilyCommand family = (FamilyCommand) cmd;
respCommands.addAll(Arrays.asList(family.getFamilyCommands()));
continue;
}
respCommands.add(cmd);
}
}
}
return respCommands;
Expand All @@ -87,7 +96,7 @@ public String getName() {
// NOTE that the order within the sub array matters, commands we want to have the lowest latency should be first
// in this array as they are looked up sequentially for matches
indexedRespCommand[0] = new RespCommand[]{new APPEND(), new AUTH()};
indexedRespCommand[2] = new RespCommand[]{new CONFIG(), new COMMAND()};
indexedRespCommand[2] = new RespCommand[]{new CONFIG(), new COMMAND(), new CLUSTER()};
// DEL should always be first here
indexedRespCommand[3] = new RespCommand[]{new DEL(), new DECR(), new DECRBY()};
indexedRespCommand[4] = new RespCommand[]{new ECHO()};
Expand Down Expand Up @@ -143,6 +152,24 @@ public static RespCommand fromByteBuf(ByteBuf buf, int commandLength) {
return null;
}

public final boolean match(byte[] other) {
byte[] possibleBytes = bytes;
if (other.length == possibleBytes.length) {
boolean matches = true;
for (int i = 0; i < possibleBytes.length; ++i) {
byte upperByte = possibleBytes[i];
byte targetByte = other[i];
if (upperByte == targetByte || upperByte + 22 == targetByte) {
continue;
}
matches = false;
break;
}
return matches;
}
return false;
}

public int getArity() {
return arity;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.infinispan.server.resp.cluster;

import static org.infinispan.commons.marshall.ProtoStreamTypeIds.RESP_NODE_INFORMATION;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;

@ProtoTypeId(RESP_NODE_INFORMATION)
public class NodeInformation {

private final String name;
private final String address;
private final int port;

@ProtoFactory
public NodeInformation(String name, String address, int port) {
this.name = name;
this.address = address;
this.port = port;
}

@ProtoField(1)
public String name() {
return name;
}

@ProtoField(2)
public String address() {
return address;
}

@ProtoField(value = 3, defaultValue = "-1")
public int port() {
return port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.infinispan.server.resp.commands;

import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.commands.cluster.SHARDS;

public class CLUSTER extends FamilyCommand {

private static final RespCommand[] CLUSTER_COMMANDS;

static {
CLUSTER_COMMANDS = new RespCommand[] {
new SHARDS()
};
}

@Override
public RespCommand[] getFamilyCommands() {
return CLUSTER_COMMANDS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.infinispan.server.resp.commands;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.infinispan.server.resp.ByteBufferUtils;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespRequestHandler;

import io.netty.channel.ChannelHandlerContext;

/**
* An umbrella command.
* <p>
* This command represents a family of commands. Usually, the ones with multiple keywords, for example,
* {@link CLUSTER} or ACL.
*
* @since 15.0
*/
public abstract class FamilyCommand extends RespCommand implements Resp3Command {

public FamilyCommand() {
super(0, 0, 0, 0);
}

@Override
public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
byte[] subCommand = arguments.get(0);
for (RespCommand cmd : getFamilyCommands()) {
if (cmd.match(subCommand)) {
if (cmd instanceof Resp3Command) {
return ((Resp3Command) cmd).perform(handler, ctx, arguments);
}

if (cmd instanceof AuthResp3Command) {
return ((AuthResp3Command) cmd).perform(handler, ctx, arguments);
}

break;
}
}
ByteBufferUtils.stringToByteBuf("-ERR unknown command\r\n", handler.allocatorToUse());
return handler.myStage();
}

public abstract RespCommand[] getFamilyCommands();
}

0 comments on commit e2c76ef

Please sign in to comment.