Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pcgroups-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ repositories {
dependencies {
implementation 'io.nats:jnats:2.25.1'
implementation 'org.jspecify:jspecify:1.0.0'
implementation 'io.synadia:pcgroups:0.1.0'
implementation 'io.synadia:pcgroups:0.1.1-SNAPSHOT'
implementation 'info.picocli:picocli:4.7.5'

testImplementation 'io.nats:jnats-server-runner:3.1.0'
Expand Down
44 changes: 44 additions & 0 deletions pcgroups-cli/src/main/java/io/synadia/pcg/cli/CliUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.nats.client.Nats;
import io.nats.client.Options;
import io.synadia.pcg.MemberMapping;
import io.synadia.pcg.PartitioningFilter;

import java.io.*;
import java.nio.file.Files;
Expand Down Expand Up @@ -124,6 +125,49 @@ public static List<MemberMapping> parseMemberMappings(List<String> mappingArgs)
return mappings;
}

/**
* Parses partitioning filter arguments in the format "filter:wildcard1,wildcard2" or just "filter".
* Each string in the list represents one PartitioningFilter.
*/
public static List<PartitioningFilter> parsePartitioningFilters(List<String> filterArgs) throws IllegalArgumentException {
List<PartitioningFilter> filters = new ArrayList<>();

if (filterArgs == null || filterArgs.isEmpty()) {
return filters;
}

for (String arg : filterArgs) {
int colonIndex = arg.indexOf(':');
if (colonIndex < 0) {
// No wildcards specified
filters.add(new PartitioningFilter(arg, new int[0]));
} else {
String filter = arg.substring(0, colonIndex);
String wildcardsInput = arg.substring(colonIndex + 1);
int[] wildcards = parsePartitioningWildcardsString(wildcardsInput);
filters.add(new PartitioningFilter(filter, wildcards));
}
}

return filters;
}

private static int[] parsePartitioningWildcardsString(String input) throws IllegalArgumentException {
if (input == null || input.isEmpty()) {
return new int[0];
}
String[] parts = input.split(",");
int[] wildcards = new int[parts.length];
for (int i = 0; i < parts.length; i++) {
try {
wildcards[i] = Integer.parseInt(parts[i].trim());
} catch (NumberFormatException e) {
throw new IllegalArgumentException("can't parse wildcard index '" + parts[i] + "': not a valid integer");
}
}
return wildcards;
}

/**
* Parses partitioning wildcard indexes from a list of strings.
*/
Expand Down
48 changes: 27 additions & 21 deletions pcgroups-cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Elastic consumer group CLI commands.
*/
@Command(name = "elastic", description = "Elastic consumer groups mode",
mixinStandardHelpOptions = true,
subcommands = {
ElasticCommands.Ls.class,
ElasticCommands.Info.class,
Expand All @@ -53,7 +54,7 @@ public Integer call() {
return 0;
}

@Command(name = "ls", aliases = {"list"}, description = "List elastic consumer groups for a stream")
@Command(name = "ls", aliases = {"list"}, description = "List elastic consumer groups for a stream", mixinStandardHelpOptions = true)
static class Ls implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand All @@ -74,7 +75,7 @@ public Integer call() {
}
}

@Command(name = "info", description = "Get elastic consumer group info")
@Command(name = "info", description = "Get elastic consumer group info", mixinStandardHelpOptions = true)
static class Info implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand All @@ -90,8 +91,16 @@ public Integer call() {
try (Connection nc = CliUtils.connect(parent.parent.context)) {
ElasticConsumerGroupConfig config = ElasticConsumerGroup.getConfig(nc, streamName, consumerGroupName);

System.out.printf("config: max members=%d, filter=%s, partitioning wildcards %s%n",
config.getMaxMembers(), config.getFilter(), Arrays.toString(config.getPartitioningWildcards()));
System.out.printf("config: max members=%d, max buffered msgs=%d, max buffered bytes=%d%n", config.getMaxMembers(), config.getMaxBufferedMessages(), config.getMaxBufferedBytes());

if (config.getPartitioningFilters().isEmpty()) {
System.out.printf("no partitioning filters defined (whole subject used)%n");
} else {
for (PartitioningFilter pf : config.getPartitioningFilters()) {
System.out.printf("filter=%s, partitioning wildcards %s%n",
pf.getFilter(), Arrays.toString(pf.getPartitioningWildcards()));
}
}

if (!config.getMembers().isEmpty()) {
System.out.printf("members: %s%n", config.getMembers());
Expand All @@ -111,7 +120,7 @@ public Integer call() {
}
}

@Command(name = "create", description = "Create an elastic partitioned consumer group")
@Command(name = "create", description = "Create an elastic partitioned consumer group", mixinStandardHelpOptions = true)
static class Create implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand All @@ -125,11 +134,8 @@ static class Create implements Callable<Integer> {
@Parameters(index = "2", description = "Max number of members")
int maxMembers;

@Parameters(index = "3", description = "Filter")
String filter;

@Parameters(index = "4..*", description = "Partitioning wildcard indexes")
List<String> wildcardArgs;
@Option(names = "--filter", description = "Partitioning filter in format 'subject:wildcard1,wildcard2' or just 'subject' (repeatable, omit to use whole subject)", split = "\\|")
List<String> filterArgs;

@Option(names = "--max-buffered-msgs", description = "Max number of buffered messages", defaultValue = "0")
long maxBufferedMsgs;
Expand All @@ -140,8 +146,8 @@ static class Create implements Callable<Integer> {
@Override
public Integer call() {
try (Connection nc = CliUtils.connect(parent.parent.context)) {
int[] wildcards = CliUtils.parsePartitioningWildcards(wildcardArgs);
ElasticConsumerGroup.create(nc, streamName, consumerGroupName, maxMembers, filter, wildcards, maxBufferedMsgs, maxBufferedBytes);
List<PartitioningFilter> partitioningFilters = CliUtils.parsePartitioningFilters(filterArgs);
ElasticConsumerGroup.create(nc, streamName, consumerGroupName, maxMembers, partitioningFilters, maxBufferedMsgs, maxBufferedBytes);
System.out.println("elastic partitioned consumer group created");
return 0;
} catch (Exception e) {
Expand All @@ -151,7 +157,7 @@ public Integer call() {
}
}

@Command(name = "delete", aliases = {"rm"}, description = "Delete an elastic partitioned consumer group")
@Command(name = "delete", aliases = {"rm"}, description = "Delete an elastic partitioned consumer group", mixinStandardHelpOptions = true)
static class Delete implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand Down Expand Up @@ -185,7 +191,7 @@ public Integer call() {
}
}

@Command(name = "add", description = "Add members to an elastic consumer group")
@Command(name = "add", description = "Add members to an elastic consumer group", mixinStandardHelpOptions = true)
static class Add implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand All @@ -212,7 +218,7 @@ public Integer call() {
}
}

@Command(name = "drop", description = "Drop members from an elastic consumer group")
@Command(name = "drop", description = "Drop members from an elastic consumer group", mixinStandardHelpOptions = true)
static class Drop implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand All @@ -239,7 +245,7 @@ public Integer call() {
}
}

@Command(name = "create-mapping", aliases = {"cm", "createmapping"}, description = "Create member mappings for an elastic consumer group")
@Command(name = "create-mapping", aliases = {"cm", "createmapping"}, description = "Create member mappings for an elastic consumer group", mixinStandardHelpOptions = true)
static class CreateMapping implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand Down Expand Up @@ -267,7 +273,7 @@ public Integer call() {
}
}

@Command(name = "delete-mapping", aliases = {"dm", "deletemapping"}, description = "Delete member mappings for an elastic consumer group")
@Command(name = "delete-mapping", aliases = {"dm", "deletemapping"}, description = "Delete member mappings for an elastic consumer group", mixinStandardHelpOptions = true)
static class DeleteMapping implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand All @@ -291,7 +297,7 @@ public Integer call() {
}
}

@Command(name = "member-info", aliases = {"memberinfo", "minfo"}, description = "Get elastic consumer group member info")
@Command(name = "member-info", aliases = {"memberinfo", "minfo"}, description = "Get elastic consumer group member info", mixinStandardHelpOptions = true)
static class MemberInfo implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand Down Expand Up @@ -330,7 +336,7 @@ public Integer call() {
}
}

@Command(name = "step-down", aliases = {"stepdown", "sd"}, description = "Initiate a step down for a member")
@Command(name = "step-down", aliases = {"stepdown", "sd"}, description = "Initiate a step down for a member", mixinStandardHelpOptions = true)
static class StepDown implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand All @@ -357,7 +363,7 @@ public Integer call() {
}
}

@Command(name = "consume", aliases = {"join"}, description = "Join an elastic partitioned consumer group")
@Command(name = "consume", aliases = {"join"}, description = "Join an elastic partitioned consumer group", mixinStandardHelpOptions = true)
static class Consume implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand Down Expand Up @@ -417,7 +423,7 @@ public Integer call() {
}
}

@Command(name = "prompt", description = "Interactive prompt mode")
@Command(name = "prompt", description = "Interactive prompt mode", mixinStandardHelpOptions = true)
static class Prompt implements Callable<Integer> {
@ParentCommand
private ElasticCommands parent;
Expand Down
61 changes: 45 additions & 16 deletions pcgroups-cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private void printHelp() {
System.out.println(" exit/quit - exit the program");
System.out.println(" list/ls <stream name> - list partitioned consumer groups");
System.out.println(" info <stream name> <partitioned consumer group name> - get partitioned consumer group info");
System.out.println(" create <stream name> <partitioned consumer group name> <max members> <filter> <comma separated partitioning wildcard indexes> - create a partitioned consumer group");
System.out.println(" create - create a partitioned consumer group (interactive, filter and wildcard indexes are optional)");
System.out.println(" delete/rm <stream name> <partitioned consumer group name> - delete a partitioned consumer group");
System.out.println(" memberinfo/minfo <stream name> <partitioned consumer group name> <member name> - get partitioned consumer group member info");
System.out.println(" add <stream name> <partitioned consumer group name> <member name> [...] - add a member to a partitioned consumer group");
Expand Down Expand Up @@ -245,8 +245,15 @@ private void handleInfo(BufferedReader reader, String[] args) {
System.out.printf("currently active members: %s%n", activeMembers);
} else {
ElasticConsumerGroupConfig config = ElasticConsumerGroup.getConfig(nc, currentStream, currentGroup);
System.out.printf("config: max members=%d, filter=%s, partitioning wildcards %s%n",
config.getMaxMembers(), config.getFilter(), Arrays.toString(config.getPartitioningWildcards()));
if (config.getPartitioningFilters().isEmpty()) {
System.out.printf("config: max members=%d, no partitioning filters (whole subject used)%n",
config.getMaxMembers());
} else {
for (PartitioningFilter pf : config.getPartitioningFilters()) {
System.out.printf("config: max members=%d, filter=%s, partitioning wildcards %s%n",
config.getMaxMembers(), pf.getFilter(), Arrays.toString(pf.getPartitioningWildcards()));
}
}
if (!config.getMembers().isEmpty()) {
System.out.printf("members: %s%n", config.getMembers());
} else if (!config.getMemberMappings().isEmpty()) {
Expand Down Expand Up @@ -283,12 +290,11 @@ private void handleCreate(BufferedReader reader) {
if (input == null) return;
int maxMembers = Integer.parseInt(input.trim());

System.out.print("filter: ");
input = reader.readLine();
if (input == null) return;
String filter = input.trim();

if (isStatic) {
System.out.print("filter (empty for whole subject partitioning): ");
input = reader.readLine();
if (input == null) return;
String filter = input.trim().isEmpty() ? null : input.trim();
System.out.print("space separated set of members (hit return to set member mappings instead): ");
input = reader.readLine();
if (input == null) return;
Expand All @@ -310,13 +316,29 @@ private void handleCreate(BufferedReader reader) {
System.out.println("static partitioned consumer group created");
}
} else {
System.out.print("space separated partitioning wildcard indexes: ");
input = reader.readLine();
if (input == null) return;
String[] pwciArgs = input.trim().split("\\s+");
int[] wildcards = new int[pwciArgs.length];
for (int i = 0; i < pwciArgs.length; i++) {
wildcards[i] = Integer.parseInt(pwciArgs[i]);
List<PartitioningFilter> partitioningFilters = new ArrayList<>();
System.out.println("enter partitioning filters (empty filter to finish, or empty first filter for whole subject partitioning):");
while (true) {
System.out.print(" filter: ");
input = reader.readLine();
if (input == null) return;
String filter = input.trim();
if (filter.isEmpty()) break;

System.out.print(" space separated partitioning wildcard indexes (empty for none): ");
input = reader.readLine();
if (input == null) return;
int[] wildcards;
if (input.trim().isEmpty()) {
wildcards = new int[0];
} else {
String[] pwciArgs = input.trim().split("\\s+");
wildcards = new int[pwciArgs.length];
for (int i = 0; i < pwciArgs.length; i++) {
wildcards[i] = Integer.parseInt(pwciArgs[i]);
}
}
partitioningFilters.add(new PartitioningFilter(filter, wildcards));
}

System.out.print("max buffered messages (0 for no limit): ");
Expand All @@ -329,7 +351,7 @@ private void handleCreate(BufferedReader reader) {
if (input == null) return;
long maxBufferedBytes = input.trim().isEmpty() ? 0 : Long.parseLong(input.trim());

ElasticConsumerGroup.create(nc, currentStream, currentGroup, maxMembers, filter, wildcards, maxBufferedMsgs, maxBufferedBytes);
ElasticConsumerGroup.create(nc, currentStream, currentGroup, maxMembers, partitioningFilters, maxBufferedMsgs, maxBufferedBytes);
System.out.println("elastic partitioned consumer group created");
}
} catch (Exception e) {
Expand All @@ -353,6 +375,13 @@ private void handleDelete(BufferedReader reader, String[] args) {
currentGroup = args[1];
}

System.out.print("WARNING: this operation will cause all existing consumer members to terminate consuming. Are you sure? (y/n): ");
String confirm = reader.readLine();
if (confirm == null || !confirm.trim().equalsIgnoreCase("y")) {
System.out.println("Operation canceled");
return;
}

if (isStatic) {
StaticConsumerGroup.delete(nc, currentStream, currentGroup);
System.out.println("static consumer group deleted");
Expand Down
Loading
Loading