diff --git a/pcgroups-cli/build.gradle b/pcgroups-cli/build.gradle index 54d7b0f..7680894 100644 --- a/pcgroups-cli/build.gradle +++ b/pcgroups-cli/build.gradle @@ -20,7 +20,7 @@ repositories { dependencies { implementation 'io.nats:jnats:2.25.1' implementation 'org.jspecify:jspecify:1.0.0' - implementation 'io.synadia:pcgroups:0.1.0' + implementation 'io.synadia:pcgroups:0.1.1-SNAPSHOT' implementation 'info.picocli:picocli:4.7.5' testImplementation 'io.nats:jnats-server-runner:3.1.0' diff --git a/pcgroups-cli/src/main/java/io/synadia/pcg/cli/CliUtils.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/CliUtils.java index bd355de..f0811ac 100644 --- a/pcgroups-cli/src/main/java/io/synadia/pcg/cli/CliUtils.java +++ b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/CliUtils.java @@ -19,6 +19,7 @@ import io.nats.client.Nats; import io.nats.client.Options; import io.synadia.pcg.MemberMapping; +import io.synadia.pcg.PartitioningFilter; import java.io.*; import java.nio.file.Files; @@ -124,6 +125,49 @@ public static List parseMemberMappings(List mappingArgs) return mappings; } + /** + * Parses partitioning filter arguments in the format "filter:wildcard1,wildcard2" or just "filter". + * Each string in the list represents one PartitioningFilter. + */ + public static List parsePartitioningFilters(List filterArgs) throws IllegalArgumentException { + List filters = new ArrayList<>(); + + if (filterArgs == null || filterArgs.isEmpty()) { + return filters; + } + + for (String arg : filterArgs) { + int colonIndex = arg.indexOf(':'); + if (colonIndex < 0) { + // No wildcards specified + filters.add(new PartitioningFilter(arg, new int[0])); + } else { + String filter = arg.substring(0, colonIndex); + String wildcardsInput = arg.substring(colonIndex + 1); + int[] wildcards = parsePartitioningWildcardsString(wildcardsInput); + filters.add(new PartitioningFilter(filter, wildcards)); + } + } + + return filters; + } + + private static int[] parsePartitioningWildcardsString(String input) throws IllegalArgumentException { + if (input == null || input.isEmpty()) { + return new int[0]; + } + String[] parts = input.split(","); + int[] wildcards = new int[parts.length]; + for (int i = 0; i < parts.length; i++) { + try { + wildcards[i] = Integer.parseInt(parts[i].trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("can't parse wildcard index '" + parts[i] + "': not a valid integer"); + } + } + return wildcards; + } + /** * Parses partitioning wildcard indexes from a list of strings. */ diff --git a/pcgroups-cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java index 5bec79c..b4fe1f9 100644 --- a/pcgroups-cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java +++ b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java @@ -28,6 +28,7 @@ * Elastic consumer group CLI commands. */ @Command(name = "elastic", description = "Elastic consumer groups mode", + mixinStandardHelpOptions = true, subcommands = { ElasticCommands.Ls.class, ElasticCommands.Info.class, @@ -53,7 +54,7 @@ public Integer call() { return 0; } - @Command(name = "ls", aliases = {"list"}, description = "List elastic consumer groups for a stream") + @Command(name = "ls", aliases = {"list"}, description = "List elastic consumer groups for a stream", mixinStandardHelpOptions = true) static class Ls implements Callable { @ParentCommand private ElasticCommands parent; @@ -74,7 +75,7 @@ public Integer call() { } } - @Command(name = "info", description = "Get elastic consumer group info") + @Command(name = "info", description = "Get elastic consumer group info", mixinStandardHelpOptions = true) static class Info implements Callable { @ParentCommand private ElasticCommands parent; @@ -90,8 +91,16 @@ public Integer call() { try (Connection nc = CliUtils.connect(parent.parent.context)) { ElasticConsumerGroupConfig config = ElasticConsumerGroup.getConfig(nc, streamName, consumerGroupName); - System.out.printf("config: max members=%d, filter=%s, partitioning wildcards %s%n", - config.getMaxMembers(), config.getFilter(), Arrays.toString(config.getPartitioningWildcards())); + System.out.printf("config: max members=%d, max buffered msgs=%d, max buffered bytes=%d%n", config.getMaxMembers(), config.getMaxBufferedMessages(), config.getMaxBufferedBytes()); + + if (config.getPartitioningFilters().isEmpty()) { + System.out.printf("no partitioning filters defined (whole subject used)%n"); + } else { + for (PartitioningFilter pf : config.getPartitioningFilters()) { + System.out.printf("filter=%s, partitioning wildcards %s%n", + pf.getFilter(), Arrays.toString(pf.getPartitioningWildcards())); + } + } if (!config.getMembers().isEmpty()) { System.out.printf("members: %s%n", config.getMembers()); @@ -111,7 +120,7 @@ public Integer call() { } } - @Command(name = "create", description = "Create an elastic partitioned consumer group") + @Command(name = "create", description = "Create an elastic partitioned consumer group", mixinStandardHelpOptions = true) static class Create implements Callable { @ParentCommand private ElasticCommands parent; @@ -125,11 +134,8 @@ static class Create implements Callable { @Parameters(index = "2", description = "Max number of members") int maxMembers; - @Parameters(index = "3", description = "Filter") - String filter; - - @Parameters(index = "4..*", description = "Partitioning wildcard indexes") - List wildcardArgs; + @Option(names = "--filter", description = "Partitioning filter in format 'subject:wildcard1,wildcard2' or just 'subject' (repeatable, omit to use whole subject)", split = "\\|") + List filterArgs; @Option(names = "--max-buffered-msgs", description = "Max number of buffered messages", defaultValue = "0") long maxBufferedMsgs; @@ -140,8 +146,8 @@ static class Create implements Callable { @Override public Integer call() { try (Connection nc = CliUtils.connect(parent.parent.context)) { - int[] wildcards = CliUtils.parsePartitioningWildcards(wildcardArgs); - ElasticConsumerGroup.create(nc, streamName, consumerGroupName, maxMembers, filter, wildcards, maxBufferedMsgs, maxBufferedBytes); + List partitioningFilters = CliUtils.parsePartitioningFilters(filterArgs); + ElasticConsumerGroup.create(nc, streamName, consumerGroupName, maxMembers, partitioningFilters, maxBufferedMsgs, maxBufferedBytes); System.out.println("elastic partitioned consumer group created"); return 0; } catch (Exception e) { @@ -151,7 +157,7 @@ public Integer call() { } } - @Command(name = "delete", aliases = {"rm"}, description = "Delete an elastic partitioned consumer group") + @Command(name = "delete", aliases = {"rm"}, description = "Delete an elastic partitioned consumer group", mixinStandardHelpOptions = true) static class Delete implements Callable { @ParentCommand private ElasticCommands parent; @@ -185,7 +191,7 @@ public Integer call() { } } - @Command(name = "add", description = "Add members to an elastic consumer group") + @Command(name = "add", description = "Add members to an elastic consumer group", mixinStandardHelpOptions = true) static class Add implements Callable { @ParentCommand private ElasticCommands parent; @@ -212,7 +218,7 @@ public Integer call() { } } - @Command(name = "drop", description = "Drop members from an elastic consumer group") + @Command(name = "drop", description = "Drop members from an elastic consumer group", mixinStandardHelpOptions = true) static class Drop implements Callable { @ParentCommand private ElasticCommands parent; @@ -239,7 +245,7 @@ public Integer call() { } } - @Command(name = "create-mapping", aliases = {"cm", "createmapping"}, description = "Create member mappings for an elastic consumer group") + @Command(name = "create-mapping", aliases = {"cm", "createmapping"}, description = "Create member mappings for an elastic consumer group", mixinStandardHelpOptions = true) static class CreateMapping implements Callable { @ParentCommand private ElasticCommands parent; @@ -267,7 +273,7 @@ public Integer call() { } } - @Command(name = "delete-mapping", aliases = {"dm", "deletemapping"}, description = "Delete member mappings for an elastic consumer group") + @Command(name = "delete-mapping", aliases = {"dm", "deletemapping"}, description = "Delete member mappings for an elastic consumer group", mixinStandardHelpOptions = true) static class DeleteMapping implements Callable { @ParentCommand private ElasticCommands parent; @@ -291,7 +297,7 @@ public Integer call() { } } - @Command(name = "member-info", aliases = {"memberinfo", "minfo"}, description = "Get elastic consumer group member info") + @Command(name = "member-info", aliases = {"memberinfo", "minfo"}, description = "Get elastic consumer group member info", mixinStandardHelpOptions = true) static class MemberInfo implements Callable { @ParentCommand private ElasticCommands parent; @@ -330,7 +336,7 @@ public Integer call() { } } - @Command(name = "step-down", aliases = {"stepdown", "sd"}, description = "Initiate a step down for a member") + @Command(name = "step-down", aliases = {"stepdown", "sd"}, description = "Initiate a step down for a member", mixinStandardHelpOptions = true) static class StepDown implements Callable { @ParentCommand private ElasticCommands parent; @@ -357,7 +363,7 @@ public Integer call() { } } - @Command(name = "consume", aliases = {"join"}, description = "Join an elastic partitioned consumer group") + @Command(name = "consume", aliases = {"join"}, description = "Join an elastic partitioned consumer group", mixinStandardHelpOptions = true) static class Consume implements Callable { @ParentCommand private ElasticCommands parent; @@ -417,7 +423,7 @@ public Integer call() { } } - @Command(name = "prompt", description = "Interactive prompt mode") + @Command(name = "prompt", description = "Interactive prompt mode", mixinStandardHelpOptions = true) static class Prompt implements Callable { @ParentCommand private ElasticCommands parent; diff --git a/pcgroups-cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java index 9279742..7dd7450 100644 --- a/pcgroups-cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java +++ b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java @@ -159,7 +159,7 @@ private void printHelp() { System.out.println(" exit/quit - exit the program"); System.out.println(" list/ls - list partitioned consumer groups"); System.out.println(" info - get partitioned consumer group info"); - System.out.println(" create - create a partitioned consumer group"); + System.out.println(" create - create a partitioned consumer group (interactive, filter and wildcard indexes are optional)"); System.out.println(" delete/rm - delete a partitioned consumer group"); System.out.println(" memberinfo/minfo - get partitioned consumer group member info"); System.out.println(" add [...] - add a member to a partitioned consumer group"); @@ -245,8 +245,15 @@ private void handleInfo(BufferedReader reader, String[] args) { System.out.printf("currently active members: %s%n", activeMembers); } else { ElasticConsumerGroupConfig config = ElasticConsumerGroup.getConfig(nc, currentStream, currentGroup); - System.out.printf("config: max members=%d, filter=%s, partitioning wildcards %s%n", - config.getMaxMembers(), config.getFilter(), Arrays.toString(config.getPartitioningWildcards())); + if (config.getPartitioningFilters().isEmpty()) { + System.out.printf("config: max members=%d, no partitioning filters (whole subject used)%n", + config.getMaxMembers()); + } else { + for (PartitioningFilter pf : config.getPartitioningFilters()) { + System.out.printf("config: max members=%d, filter=%s, partitioning wildcards %s%n", + config.getMaxMembers(), pf.getFilter(), Arrays.toString(pf.getPartitioningWildcards())); + } + } if (!config.getMembers().isEmpty()) { System.out.printf("members: %s%n", config.getMembers()); } else if (!config.getMemberMappings().isEmpty()) { @@ -283,12 +290,11 @@ private void handleCreate(BufferedReader reader) { if (input == null) return; int maxMembers = Integer.parseInt(input.trim()); - System.out.print("filter: "); - input = reader.readLine(); - if (input == null) return; - String filter = input.trim(); - if (isStatic) { + System.out.print("filter (empty for whole subject partitioning): "); + input = reader.readLine(); + if (input == null) return; + String filter = input.trim().isEmpty() ? null : input.trim(); System.out.print("space separated set of members (hit return to set member mappings instead): "); input = reader.readLine(); if (input == null) return; @@ -310,13 +316,29 @@ private void handleCreate(BufferedReader reader) { System.out.println("static partitioned consumer group created"); } } else { - System.out.print("space separated partitioning wildcard indexes: "); - input = reader.readLine(); - if (input == null) return; - String[] pwciArgs = input.trim().split("\\s+"); - int[] wildcards = new int[pwciArgs.length]; - for (int i = 0; i < pwciArgs.length; i++) { - wildcards[i] = Integer.parseInt(pwciArgs[i]); + List partitioningFilters = new ArrayList<>(); + System.out.println("enter partitioning filters (empty filter to finish, or empty first filter for whole subject partitioning):"); + while (true) { + System.out.print(" filter: "); + input = reader.readLine(); + if (input == null) return; + String filter = input.trim(); + if (filter.isEmpty()) break; + + System.out.print(" space separated partitioning wildcard indexes (empty for none): "); + input = reader.readLine(); + if (input == null) return; + int[] wildcards; + if (input.trim().isEmpty()) { + wildcards = new int[0]; + } else { + String[] pwciArgs = input.trim().split("\\s+"); + wildcards = new int[pwciArgs.length]; + for (int i = 0; i < pwciArgs.length; i++) { + wildcards[i] = Integer.parseInt(pwciArgs[i]); + } + } + partitioningFilters.add(new PartitioningFilter(filter, wildcards)); } System.out.print("max buffered messages (0 for no limit): "); @@ -329,7 +351,7 @@ private void handleCreate(BufferedReader reader) { if (input == null) return; long maxBufferedBytes = input.trim().isEmpty() ? 0 : Long.parseLong(input.trim()); - ElasticConsumerGroup.create(nc, currentStream, currentGroup, maxMembers, filter, wildcards, maxBufferedMsgs, maxBufferedBytes); + ElasticConsumerGroup.create(nc, currentStream, currentGroup, maxMembers, partitioningFilters, maxBufferedMsgs, maxBufferedBytes); System.out.println("elastic partitioned consumer group created"); } } catch (Exception e) { @@ -353,6 +375,13 @@ private void handleDelete(BufferedReader reader, String[] args) { currentGroup = args[1]; } + System.out.print("WARNING: this operation will cause all existing consumer members to terminate consuming. Are you sure? (y/n): "); + String confirm = reader.readLine(); + if (confirm == null || !confirm.trim().equalsIgnoreCase("y")) { + System.out.println("Operation canceled"); + return; + } + if (isStatic) { StaticConsumerGroup.delete(nc, currentStream, currentGroup); System.out.println("static consumer group deleted"); diff --git a/pcgroups-cli/src/main/java/io/synadia/pcg/cli/StaticCommands.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/StaticCommands.java index 5885ae9..6756357 100644 --- a/pcgroups-cli/src/main/java/io/synadia/pcg/cli/StaticCommands.java +++ b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/StaticCommands.java @@ -28,6 +28,7 @@ * Static consumer group CLI commands. */ @Command(name = "static", description = "Static consumer groups mode", + mixinStandardHelpOptions = true, subcommands = { StaticCommands.Ls.class, StaticCommands.Info.class, @@ -49,7 +50,7 @@ public Integer call() { return 0; } - @Command(name = "ls", aliases = {"list"}, description = "List static consumer groups for a stream") + @Command(name = "ls", aliases = {"list"}, description = "List static consumer groups for a stream", mixinStandardHelpOptions = true) static class Ls implements Callable { @ParentCommand private StaticCommands parent; @@ -70,7 +71,7 @@ public Integer call() { } } - @Command(name = "info", description = "Get static consumer group info") + @Command(name = "info", description = "Get static consumer group info", mixinStandardHelpOptions = true) static class Info implements Callable { @ParentCommand private StaticCommands parent; @@ -107,6 +108,7 @@ public Integer call() { } @Command(name = "create", description = "Create a static partitioned consumer group", + mixinStandardHelpOptions = true, subcommands = {StaticCommands.CreateBalanced.class, StaticCommands.CreateMapped.class}) static class Create implements Callable { @ParentCommand @@ -119,7 +121,7 @@ public Integer call() { } } - @Command(name = "balanced", description = "Create a static consumer group with balanced members") + @Command(name = "balanced", description = "Create a static consumer group with balanced members", mixinStandardHelpOptions = true) static class CreateBalanced implements Callable { @ParentCommand private Create createParent; @@ -152,7 +154,7 @@ public Integer call() { } } - @Command(name = "mapped", description = "Create a static consumer group with member mappings") + @Command(name = "mapped", description = "Create a static consumer group with member mappings", mixinStandardHelpOptions = true) static class CreateMapped implements Callable { @ParentCommand private Create createParent; @@ -186,7 +188,7 @@ public Integer call() { } } - @Command(name = "delete", aliases = {"rm"}, description = "Delete a static partitioned consumer group") + @Command(name = "delete", aliases = {"rm"}, description = "Delete a static partitioned consumer group", mixinStandardHelpOptions = true) static class Delete implements Callable { @ParentCommand private StaticCommands parent; @@ -220,7 +222,7 @@ public Integer call() { } } - @Command(name = "member-info", aliases = {"memberinfo", "minfo"}, description = "Get static consumer group member info") + @Command(name = "member-info", aliases = {"memberinfo", "minfo"}, description = "Get static consumer group member info", mixinStandardHelpOptions = true) static class MemberInfo implements Callable { @ParentCommand private StaticCommands parent; @@ -258,7 +260,7 @@ public Integer call() { } } - @Command(name = "step-down", aliases = {"stepdown", "sd"}, description = "Initiate a step down for a member") + @Command(name = "step-down", aliases = {"stepdown", "sd"}, description = "Initiate a step down for a member", mixinStandardHelpOptions = true) static class StepDown implements Callable { @ParentCommand private StaticCommands parent; @@ -285,7 +287,7 @@ public Integer call() { } } - @Command(name = "consume", aliases = {"join"}, description = "Join a static partitioned consumer group") + @Command(name = "consume", aliases = {"join"}, description = "Join a static partitioned consumer group", mixinStandardHelpOptions = true) static class Consume implements Callable { @ParentCommand private StaticCommands parent; @@ -345,7 +347,7 @@ public Integer call() { } } - @Command(name = "prompt", description = "Interactive prompt mode") + @Command(name = "prompt", description = "Interactive prompt mode", mixinStandardHelpOptions = true) static class Prompt implements Callable { @ParentCommand private StaticCommands parent; diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java index fd80ecc..fd3bcb5 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java +++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java @@ -50,19 +50,18 @@ private ElasticConsumerGroup() { * @param streamName Name of the source stream * @param consumerGroupName Name of the consumer group * @param maxMembers Maximum number of members (partitions) - * @param filter Subject filter with wildcards - * @param partitioningWildcards Indexes of wildcards to use for partitioning - * @param maxBufferedMessages Max messages in work queue (0 for unlimited) + * @param partitioningFilters List of partitioning filters + * @param maxBufferedMessages Max messages in work queue (0 for unlimited) * @param maxBufferedBytes Max bytes in work queue (0 for unlimited) * @return The created configuration */ public static ElasticConsumerGroupConfig create(Connection nc, String streamName, String consumerGroupName, - int maxMembers, String filter, int[] partitioningWildcards, + int maxMembers, List partitioningFilters, long maxBufferedMessages, long maxBufferedBytes) throws ConsumerGroupException, IOException, JetStreamApiException, InterruptedException { ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( - maxMembers, filter, partitioningWildcards, maxBufferedMessages, maxBufferedBytes, + maxMembers, partitioningFilters, maxBufferedMessages, maxBufferedBytes, new ArrayList<>(), new ArrayList<>()); config.validate(); @@ -95,10 +94,9 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName ElasticConsumerGroupConfig existingConfig = ElasticConsumerGroupConfig.instance(kv.get(key)); if (existingConfig != null) { if (existingConfig.getMaxMembers() != maxMembers || - !Objects.equals(existingConfig.getFilter(), filter) || + !Objects.equals(existingConfig.getPartitioningFilters(), partitioningFilters) || existingConfig.getMaxBufferedMessages() != maxBufferedMessages || - existingConfig.getMaxBufferedBytes() != maxBufferedBytes || - !Arrays.equals(existingConfig.getPartitioningWildcards(), partitioningWildcards)) { + existingConfig.getMaxBufferedBytes() != maxBufferedBytes) { throw new ConsumerGroupException( "the existing elastic consumer group config can not be updated to the requested one, " + "please delete the existing elastic consumer group and create a new one"); @@ -111,8 +109,6 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName // Create the work queue stream with subject transform String workQueueStreamName = composeCGSName(streamName, consumerGroupName); - String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">"; - String filterDest = getPartitioningTransformDest(config); StreamConfiguration.Builder scBuilder = StreamConfiguration.builder() .name(workQueueStreamName) @@ -129,14 +125,26 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName scBuilder.maxBytes(maxBufferedBytes); } - // Add source with subject transform + // Add source with subject transforms + List subjectTransforms = new ArrayList<>(); + if (partitioningFilters != null && !partitioningFilters.isEmpty()) { + for (PartitioningFilter pf : partitioningFilters) { + subjectTransforms.add(SubjectTransform.builder() + .source(pf.getFilter()) + .destination(getPartitioningTransformDest(pf, maxMembers)) + .build()); + } + } else { + subjectTransforms.add(SubjectTransform.builder() + .source(">") + .destination(getPartitioningTransformDest(new PartitioningFilter(">", new int[0]), maxMembers)) + .build()); + } + scBuilder.addSource(Source.builder() .sourceName(streamName) .startSeq(0) - .subjectTransforms(SubjectTransform.builder() - .source(effectiveFilter) - .destination(filterDest) - .build()) + .subjectTransforms(subjectTransforms.toArray(new SubjectTransform[0])) .build()); try { @@ -513,8 +521,17 @@ public static ElasticConsumerGroupConfig getConfig(Connection nc, String streamN * Returns the list of partition filters for a given member based on the config. */ public static List getPartitionFilters(ElasticConsumerGroupConfig config, String memberName) { - return PartitionUtils.generatePartitionFilters( - config.getMembers(), config.getMaxMembers(), config.getMemberMappings(), memberName); + List filters = new ArrayList<>(); + if (!config.getPartitioningFilters().isEmpty()) { + for (PartitioningFilter pf : config.getPartitioningFilters()) { + filters.addAll(PartitionUtils.generatePartitionFilters( + config.getMembers(), config.getMaxMembers(), config.getMemberMappings(), memberName, pf.getFilter())); + } + } else { + filters.addAll(PartitionUtils.generatePartitionFilters( + config.getMembers(), config.getMaxMembers(), config.getMemberMappings(), memberName)); + } + return filters; } private static ElasticConsumerGroupConfig getConfigFromKV(KeyValue kv, String streamName, String consumerGroupName) @@ -533,9 +550,9 @@ private static ElasticConsumerGroupConfig getConfigFromKV(KeyValue kv, String st return config; } - private static String getPartitioningTransformDest(ElasticConsumerGroupConfig config) { - String effectiveFilter = (config.getFilter() != null && !config.getFilter().isEmpty()) ? config.getFilter() : ">"; - int[] wildcards = config.getPartitioningWildcards(); + private static String getPartitioningTransformDest(PartitioningFilter pf, int maxMembers) { + String effectiveFilter = (pf.getFilter() != null && !pf.getFilter().isEmpty()) ? pf.getFilter() : ">"; + int[] wildcards = pf.getPartitioningWildcards(); StringBuilder wildcardList = new StringBuilder(); for (int i = 0; i < wildcards.length; i++) { @@ -555,10 +572,10 @@ private static String getPartitioningTransformDest(ElasticConsumerGroupConfig co String destFromFilter = String.join(".", filterTokens); if (wildcards.length == 0) { - return "{{Partition(" + config.getMaxMembers() + ")}}." + destFromFilter; + return "{{Partition(" + maxMembers + ")}}." + destFromFilter; } - return "{{Partition(" + config.getMaxMembers() + "," + wildcardList + ")}}." + destFromFilter; + return "{{Partition(" + maxMembers + "," + wildcardList + ")}}." + destFromFilter; } /** @@ -723,10 +740,9 @@ private void processWatcherUpdate(KeyValueEntry entry) { // Check if critical config changed (immutable fields) if (newConfig.getMaxMembers() != config.getMaxMembers() || - !Objects.equals(newConfig.getFilter(), config.getFilter()) || + !Objects.equals(newConfig.getPartitioningFilters(), config.getPartitioningFilters()) || newConfig.getMaxBufferedMessages() != config.getMaxBufferedMessages() || - newConfig.getMaxBufferedBytes() != config.getMaxBufferedBytes() || - !Arrays.equals(newConfig.getPartitioningWildcards(), config.getPartitioningWildcards())) { + newConfig.getMaxBufferedBytes() != config.getMaxBufferedBytes()) { stopConsuming(); stopped.set(true); doneFuture.completeExceptionally( diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java index dc057af..16c2e9e 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java +++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java @@ -30,16 +30,14 @@ */ public class ElasticConsumerGroupConfig implements JsonSerializable { static final String MAX_MEMBERS = "max_members"; - static final String FILTER = "filter"; - static final String PARTITIONING_WILDCARDS = "partitioning_wildcards"; + static final String PARTITIONING_FILTERS = "partitioning_filters"; static final String MAX_BUFFERED_MSG = "max_buffered_msg"; static final String MAX_BUFFERED_BYTES = "max_buffered_bytes"; static final String MEMBERS = "members"; static final String MEMBER_MAPPINGS = "member_mappings"; private int maxMembers; - private String filter; - private int[] partitioningWildcards; + private List partitioningFilters; private long maxBufferedMessages; private long maxBufferedBytes; private List members; @@ -67,17 +65,16 @@ public static ElasticConsumerGroupConfig instance(byte @NonNull[] json) throws J } public ElasticConsumerGroupConfig() { - this.partitioningWildcards = new int[0]; + this.partitioningFilters = new ArrayList<>(); this.members = new ArrayList<>(); this.memberMappings = new ArrayList<>(); } - public ElasticConsumerGroupConfig(int maxMembers, String filter, int[] partitioningWildcards, + public ElasticConsumerGroupConfig(int maxMembers, List partitioningFilters, long maxBufferedMessages, long maxBufferedBytes, List members, List memberMappings) { this.maxMembers = maxMembers; - this.filter = filter; - this.partitioningWildcards = partitioningWildcards != null ? partitioningWildcards.clone() : new int[0]; + this.partitioningFilters = partitioningFilters == null ? new ArrayList<>() : new ArrayList<>(partitioningFilters); this.maxBufferedMessages = maxBufferedMessages; this.maxBufferedBytes = maxBufferedBytes; this.members = members == null ? new ArrayList<>() : new ArrayList<>(members); @@ -86,13 +83,7 @@ public ElasticConsumerGroupConfig(int maxMembers, String filter, int[] partition public ElasticConsumerGroupConfig(JsonValue jv) { this.maxMembers = JsonValueUtils.readInteger(jv, MAX_MEMBERS, 0); - this.filter = JsonValueUtils.readString(jv, FILTER); - List integers = read(jv, PARTITIONING_WILDCARDS, v -> listOf(v, JsonValueUtils::getInteger)); - this.partitioningWildcards = new int[integers.size()]; - for (int x = 0; x < integers.size(); x++) { - Integer i = integers.get(x); - this.partitioningWildcards[x] = i == null ? 0 : i; - } + this.partitioningFilters = PartitioningFilter.listOfOrEmptyList(readValue(jv, PARTITIONING_FILTERS)); this.maxBufferedMessages = JsonValueUtils.readLong(jv, MAX_BUFFERED_MSG, 0); this.maxBufferedBytes = JsonValueUtils.readLong(jv, MAX_BUFFERED_BYTES, 0); this.members = JsonValueUtils.readStringList(jv, MEMBERS); @@ -107,20 +98,12 @@ public void setMaxMembers(int maxMembers) { this.maxMembers = maxMembers; } - public String getFilter() { - return filter; - } - - public void setFilter(String filter) { - this.filter = filter; + public List getPartitioningFilters() { + return new ArrayList<>(partitioningFilters); } - public int[] getPartitioningWildcards() { - return partitioningWildcards.clone(); - } - - public void setPartitioningWildcards(int[] partitioningWildcards) { - this.partitioningWildcards = partitioningWildcards == null ? new int[0] : partitioningWildcards.clone(); + public void setPartitioningFilters(List partitioningFilters) { + this.partitioningFilters = partitioningFilters == null ? new ArrayList<>() : new ArrayList<>(partitioningFilters); } public long getMaxBufferedMessages() { @@ -188,9 +171,13 @@ public void validate() throws ConsumerGroupException { throw new ConsumerGroupException("the max number of members must be >= 1"); } - // Validate filter and partitioning wildcards - if (filter != null && !filter.isEmpty()) { - String[] filterTokens = filter.split("\\."); + // Validate partitioning filters + for (PartitioningFilter pf : partitioningFilters) { + if (pf.getFilter() == null || pf.getFilter().isEmpty()) { + throw new ConsumerGroupException("partitioning filters must have a non-empty filter"); + } + + String[] filterTokens = pf.getFilter().split("\\."); int numWildcards = 0; for (String token : filterTokens) { if ("*".equals(token)) { @@ -202,13 +189,14 @@ public void validate() throws ConsumerGroupException { throw new ConsumerGroupException("partitioning filters must have at least one * wildcard or end with > wildcard"); } - if (partitioningWildcards != null && partitioningWildcards.length > numWildcards) { + int[] wildcards = pf.getPartitioningWildcards(); + if (wildcards != null && wildcards.length > numWildcards) { throw new ConsumerGroupException("the number of partitioning wildcards must not be larger than the total number of * wildcards in the filter"); } Set seenWildcards = new HashSet<>(); - if (partitioningWildcards != null) { - for (int pwc : partitioningWildcards) { + if (wildcards != null) { + for (int pwc : wildcards) { if (seenWildcards.contains(pwc)) { throw new ConsumerGroupException("partitioning wildcard indexes must be unique"); } @@ -262,52 +250,12 @@ public void validate() throws ConsumerGroupException { } } - /** - * Generates the subject transform destination for partitioning. - */ - public String getPartitioningTransformDest() { - String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">"; - int[] effectiveWildcards = (partitioningWildcards != null) ? partitioningWildcards : new int[0]; - - StringBuilder wildcardList = new StringBuilder(); - for (int i = 0; i < effectiveWildcards.length; i++) { - if (i > 0) { - wildcardList.append(","); - } - wildcardList.append(effectiveWildcards[i]); - } - - String[] filterTokens = effectiveFilter.split("\\."); - int cwIndex = 1; - for (int i = 0; i < filterTokens.length; i++) { - if ("*".equals(filterTokens[i])) { - filterTokens[i] = "{{Wildcard(" + cwIndex + ")}}"; - cwIndex++; - } - } - - String destFromFilter = String.join(".", filterTokens); - - if (effectiveWildcards.length == 0) { - return "{{Partition(" + maxMembers + ")}}." + destFromFilter; - } - - return "{{Partition(" + maxMembers + "," + wildcardList + ")}}." + destFromFilter; - } - @Override @NonNull public String toJson() { StringBuilder sb = beginJson(); addField(sb, MAX_MEMBERS, maxMembers); - addField(sb, FILTER, filter); - if (partitioningWildcards.length > 0) { - List integers = new ArrayList<>(partitioningWildcards.length); - for (int i : partitioningWildcards) { - integers.add(i); - } - _addList(sb, PARTITIONING_WILDCARDS, integers, StringBuilder::append); - } + addJsons(sb, PARTITIONING_FILTERS, partitioningFilters); addField(sb, MAX_BUFFERED_MSG, maxBufferedMessages); addField(sb, MAX_BUFFERED_BYTES, maxBufferedBytes); addStrings(sb, MEMBERS, members); @@ -323,25 +271,21 @@ public boolean equals(Object o) { return maxMembers == that.maxMembers && maxBufferedMessages == that.maxBufferedMessages && maxBufferedBytes == that.maxBufferedBytes && - Objects.equals(filter, that.filter) && - Arrays.equals(partitioningWildcards, that.partitioningWildcards) && + Objects.equals(partitioningFilters, that.partitioningFilters) && Objects.equals(members, that.members) && Objects.equals(memberMappings, that.memberMappings); } @Override public int hashCode() { - int result = Objects.hash(maxMembers, filter, maxBufferedMessages, maxBufferedBytes, members, memberMappings); - result = 31 * result + Arrays.hashCode(partitioningWildcards); - return result; + return Objects.hash(maxMembers, partitioningFilters, maxBufferedMessages, maxBufferedBytes, members, memberMappings); } @Override public String toString() { return "ElasticConsumerGroupConfig{" + "maxMembers=" + maxMembers + - ", filter='" + filter + '\'' + - ", partitioningWildcards=" + Arrays.toString(partitioningWildcards) + + ", partitioningFilters=" + partitioningFilters + ", maxBufferedMessages=" + maxBufferedMessages + ", maxBufferedBytes=" + maxBufferedBytes + ", members=" + members + diff --git a/pcgroups/src/main/java/io/synadia/pcg/PartitionUtils.java b/pcgroups/src/main/java/io/synadia/pcg/PartitionUtils.java index ddb96e1..dddf95a 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/PartitionUtils.java +++ b/pcgroups/src/main/java/io/synadia/pcg/PartitionUtils.java @@ -75,6 +75,13 @@ public static String composeStaticConsumerName(String consumerGroupName, String */ public static List generatePartitionFilters(List members, int maxMembers, List memberMappings, String memberName) { + return generatePartitionFilters(members, maxMembers, memberMappings, memberName, ">"); + } + + public static List generatePartitionFilters(List members, int maxMembers, + List memberMappings, String memberName, + String filter) { + String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">"; if (members != null && !members.isEmpty()) { // Deduplicate and sort members List sortedMembers = members.stream() @@ -99,12 +106,12 @@ public static List generatePartitionFilters(List members, int ma if (i < (numMembers * numPer)) { if (sortedMembers.get(memberIndex % numMembers).equals(memberName)) { - myFilters.add(i + ".>"); + myFilters.add(i + "." + effectiveFilter); } } else { // Remainder if the number of partitions is not a multiple of the number of members if (sortedMembers.get((i - (numMembers * numPer)) % numMembers).equals(memberName)) { - myFilters.add(i + ".>"); + myFilters.add(i + "." + effectiveFilter); } } } @@ -118,7 +125,7 @@ public static List generatePartitionFilters(List members, int ma for (MemberMapping mapping : memberMappings) { if (mapping.getMember().equals(memberName)) { for (int pn : mapping.getPartitions()) { - myFilters.add(pn + ".>"); + myFilters.add(pn + "." + effectiveFilter); } } } diff --git a/pcgroups/src/main/java/io/synadia/pcg/PartitioningFilter.java b/pcgroups/src/main/java/io/synadia/pcg/PartitioningFilter.java new file mode 100644 index 0000000..436e9ee --- /dev/null +++ b/pcgroups/src/main/java/io/synadia/pcg/PartitioningFilter.java @@ -0,0 +1,121 @@ +// Copyright 2024-2025 Synadia Communications Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.synadia.pcg; + +import io.nats.client.support.JsonSerializable; +import io.nats.client.support.JsonValue; +import io.nats.client.support.JsonValueUtils; +import org.jspecify.annotations.NonNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static io.nats.client.support.JsonUtils.*; +import static io.nats.client.support.JsonValueUtils.*; + +/** + * Represents a partitioning filter with its associated wildcard indexes. + * JSON structure must be compatible with the Go version. + */ +public class PartitioningFilter implements JsonSerializable { + static final String FILTER = "filter"; + static final String PARTITIONING_WILDCARDS = "partitioning_wildcards"; + + private String filter; + private int[] partitioningWildcards; + + static List listOfOrEmptyList(JsonValue jv) { + return JsonValueUtils.listOf(jv, PartitioningFilter::new); + } + + public PartitioningFilter() { + this.partitioningWildcards = new int[0]; + } + + public PartitioningFilter(String filter, int[] partitioningWildcards) { + this.filter = filter; + this.partitioningWildcards = partitioningWildcards != null ? partitioningWildcards.clone() : new int[0]; + } + + public PartitioningFilter(JsonValue jv) { + this.filter = readString(jv, FILTER); + List integers = read(jv, PARTITIONING_WILDCARDS, v -> listOf(v, JsonValueUtils::getInteger)); + if (integers == null || integers.isEmpty()) { + this.partitioningWildcards = new int[0]; + } else { + this.partitioningWildcards = new int[integers.size()]; + for (int x = 0; x < integers.size(); x++) { + Integer i = integers.get(x); + this.partitioningWildcards[x] = i == null ? 0 : i; + } + } + } + + public String getFilter() { + return filter; + } + + public void setFilter(String filter) { + this.filter = filter; + } + + public int[] getPartitioningWildcards() { + return partitioningWildcards != null ? partitioningWildcards.clone() : new int[0]; + } + + public void setPartitioningWildcards(int[] partitioningWildcards) { + this.partitioningWildcards = partitioningWildcards != null ? partitioningWildcards.clone() : new int[0]; + } + + @Override + @NonNull + public String toJson() { + StringBuilder sb = beginJson(); + addField(sb, FILTER, filter); + if (partitioningWildcards.length > 0) { + List integers = new ArrayList<>(partitioningWildcards.length); + for (int i : partitioningWildcards) { + integers.add(i); + } + _addList(sb, PARTITIONING_WILDCARDS, integers, StringBuilder::append); + } + return endJson(sb).toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PartitioningFilter that = (PartitioningFilter) o; + return Objects.equals(filter, that.filter) && + Arrays.equals(partitioningWildcards, that.partitioningWildcards); + } + + @Override + public int hashCode() { + int result = Objects.hash(filter); + result = 31 * result + Arrays.hashCode(partitioningWildcards); + return result; + } + + @Override + public String toString() { + return "PartitioningFilter{" + + "filter='" + filter + '\'' + + ", partitioningWildcards=" + Arrays.toString(partitioningWildcards) + + '}'; + } +}