diff --git a/src/main/java/io/nats/client/api/Republish.java b/src/main/java/io/nats/client/api/Republish.java index c1ae6e672..dcf5832c4 100644 --- a/src/main/java/io/nats/client/api/Republish.java +++ b/src/main/java/io/nats/client/api/Republish.java @@ -23,7 +23,7 @@ import static io.nats.client.support.JsonValueUtils.readString; /** - * Republish directives to consider + * Republish Configuration */ public class Republish implements JsonSerializable { private final String source; @@ -42,18 +42,20 @@ static Republish optionalInstance(JsonValue vRepublish) { /** * Construct a 'republish' object - * @param source the Published Subject-matching filter + * @param source the Published subject matching filter * @param destination the RePublish Subject template * @param headersOnly Whether to RePublish only headers (no body) */ public Republish(String source, String destination, boolean headersOnly) { + Validator.required(source, "Source"); + Validator.required(destination, "Destination"); this.source = source; this.destination = destination; this.headersOnly = headersOnly; } /** - * Get source, the Published Subject-matching filter + * Get source, the Published subject matching filter * @return the source */ public String getSource() { @@ -134,8 +136,6 @@ public Builder headersOnly(Boolean headersOnly) { * @return the Placement */ public Republish build() { - Validator.required(source, "Source"); - Validator.required(destination, "Destination"); return new Republish(source, destination, headersOnly); } } diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index edf219e1a..a29fe0c6a 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -56,6 +56,7 @@ public class StreamConfiguration implements JsonSerializable { private final Duration duplicateWindow; private final Placement placement; private final Republish republish; + private final SubjectTransform subjectTransform; private final Mirror mirror; private final List sources; private final boolean sealed; @@ -89,6 +90,7 @@ static StreamConfiguration instance(JsonValue v) { builder.subjects(readStringList(v, SUBJECTS)); builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT))); builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH))); + builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM))); builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR))); builder.sources(Source.optionalListOf(readValue(v, SOURCES))); builder.sealed(readBoolean(v, SEALED)); @@ -124,6 +126,7 @@ static StreamConfiguration instance(JsonValue v) { this.duplicateWindow = b.duplicateWindow; this.placement = b.placement; this.republish = b.republish; + this.subjectTransform = b.subjectTransform; this.mirror = b.mirror; this.sources = b.sources; this.sealed = b.sealed; @@ -171,6 +174,9 @@ public String toJson() { if (republish != null) { addField(sb, REPUBLISH, republish); } + if (subjectTransform != null) { + addField(sb, SUBJECT_TRANSFORM, subjectTransform); + } if (mirror != null) { addField(sb, MIRROR, mirror); } @@ -343,6 +349,14 @@ public Republish getRepublish() { return republish; } + /** + * Get the subjectTransform configuration. May be null. + * @return the subjectTransform object + */ + public SubjectTransform getSubjectTransform() { + return subjectTransform; + } + /** * The mirror definition for this stream * @return the mirror @@ -511,6 +525,7 @@ public static class Builder { private Duration duplicateWindow = Duration.ZERO; private Placement placement = null; private Republish republish = null; + private SubjectTransform subjectTransform = null; private Mirror mirror = null; private final List sources = new ArrayList<>(); private boolean sealed = false; @@ -553,6 +568,7 @@ public Builder(StreamConfiguration sc) { this.duplicateWindow = sc.duplicateWindow; this.placement = sc.placement; this.republish = sc.republish; + this.subjectTransform = sc.subjectTransform; this.mirror = sc.mirror; sources(sc.sources); this.sealed = sc.sealed; @@ -812,8 +828,8 @@ public Builder placement(Placement placement) { } /** - * Sets the republish directive object - * @param republish the republish directive object + * Sets the republish config object + * @param republish the republish config object * @return Builder */ public Builder republish(Republish republish) { @@ -821,6 +837,16 @@ public Builder republish(Republish republish) { return this; } + /** + * Sets the subjectTransform config object + * @param subjectTransform the subjectTransform config object + * @return Builder + */ + public Builder subjectTransform(SubjectTransform subjectTransform) { + this.subjectTransform = subjectTransform; + return this; + } + /** * Sets the mirror object * @param mirror the mirror object diff --git a/src/main/java/io/nats/client/api/SubjectTransform.java b/src/main/java/io/nats/client/api/SubjectTransform.java new file mode 100644 index 000000000..c63e0ae52 --- /dev/null +++ b/src/main/java/io/nats/client/api/SubjectTransform.java @@ -0,0 +1,115 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import io.nats.client.support.JsonSerializable; +import io.nats.client.support.JsonValue; + +import static io.nats.client.support.ApiConstants.DEST; +import static io.nats.client.support.ApiConstants.SRC; +import static io.nats.client.support.JsonUtils.*; +import static io.nats.client.support.JsonValueUtils.readString; + +/** + * SubjectTransform + */ +public class SubjectTransform implements JsonSerializable { + private final String source; + private final String destination; + + static SubjectTransform optionalInstance(JsonValue vSubjectTransform) { + return vSubjectTransform == null ? null : new SubjectTransform(vSubjectTransform); + } + + SubjectTransform(JsonValue vSubjectTransform) { + source = readString(vSubjectTransform, SRC); + destination = readString(vSubjectTransform, DEST); + } + + /** + * Construct a 'SubjectTransform' object + * @param source the subject matching filter + * @param destination the SubjectTransform Subject template + */ + public SubjectTransform(String source, String destination) { + this.source = source; + this.destination = destination; + } + + /** + * Get source, the subject matching filter + * @return the source + */ + public String getSource() { + return source; + } + + /** + * Get destination, the SubjectTransform Subject template + * @return the destination + */ + public String getDestination() { + return destination; + } + + public String toJson() { + StringBuilder sb = beginJson(); + addField(sb, SRC, source); + addField(sb, DEST, destination); + return endJson(sb).toString(); + } + + /** + * Creates a builder for a placements object. + * @return the builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Placement can be created using a Builder. + */ + public static class Builder { + private String source; + private String destination; + + /** + * Set the Published Subject-matching filter + * @param source the source + * @return the builder + */ + public Builder source(String source) { + this.source = source; + return this; + } + /** + * Set the SubjectTransform Subject template + * @param destination the destination + * @return the builder + */ + public Builder destination(String destination) { + this.destination = destination; + return this; + } + + /** + * Build a Placement object + * @return the Placement + */ + public SubjectTransform build() { + return new SubjectTransform(source, destination); + } + } +} diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index c489f5e46..697ba7d40 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -178,6 +178,7 @@ public interface ApiConstants { String STREAM = "stream"; String STREAMS = "streams"; String SUBJECT = "subject"; + String SUBJECT_TRANSFORM = "subject_transform"; String SUBJECTS = "subjects"; String SUBJECTS_FILTER = "subjects_filter"; String SUCCESS = "success"; diff --git a/src/test/java/io/nats/client/api/StreamConfigurationTests.java b/src/test/java/io/nats/client/api/StreamConfigurationTests.java index 0c227bc70..676c7842a 100644 --- a/src/test/java/io/nats/client/api/StreamConfigurationTests.java +++ b/src/test/java/io/nats/client/api/StreamConfigurationTests.java @@ -55,7 +55,7 @@ public void testRoundTrip() throws Exception { .compressionOption(compressionOption) .build(); JetStreamManagement jsm = nc.jetStreamManagement(); - validate(jsm.addStream(sc).getConfiguration(), true, compressionOption); + validate(jsm.addStream(sc).getConfiguration(), true); }); } @@ -63,13 +63,13 @@ public void testRoundTrip() throws Exception { public void testConstruction() { StreamConfiguration testSc = getTestConfiguration(); // from json - validate(testSc, false, S2); + validate(testSc, false); // test toJson - validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false, S2); + validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false); // copy constructor - validate(StreamConfiguration.builder(testSc).build(), false, S2); + validate(StreamConfiguration.builder(testSc).build(), false); Map metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar"); @@ -94,6 +94,7 @@ public void testConstruction() { .duplicateWindow(testSc.getDuplicateWindow()) .placement(testSc.getPlacement()) .republish(testSc.getRepublish()) + .subjectTransform(testSc.getSubjectTransform()) .mirror(testSc.getMirror()) .sources(testSc.getSources()) .sealed(testSc.getSealed()) @@ -105,14 +106,14 @@ public void testConstruction() { .discardNewPerSubject(testSc.isDiscardNewPerSubject()) .metadata(metaData) .firstSequence(82942); - validate(builder.build(), false, S2); - validate(builder.addSources((Source)null).build(), false, S2); + validate(builder.build(), false); + validate(builder.addSources((Source)null).build(), false); List sources = new ArrayList<>(testSc.getSources()); sources.add(null); Source copy = new Source(JsonParser.parseUnchecked(sources.get(0).toJson())); sources.add(copy); - validate(builder.addSources(sources).build(), false, S2); + validate(builder.addSources(sources).build(), false); // covering add a single source sources = new ArrayList<>(testSc.getSources()); @@ -122,7 +123,7 @@ public void testConstruction() { builder.addSource(source); } builder.addSource(sources.get(0)); - validate(builder.build(), false, S2); + validate(builder.build(), false); // equals and hashcode coverage External external = copy.getExternal(); @@ -386,15 +387,14 @@ public void testDiscardPolicy() { assertEquals(DiscardPolicy.Old, builder.build().getDiscardPolicy()); } - private void validate(StreamConfiguration sc, boolean serverTest, CompressionOption compressionOption) { + private void validate(StreamConfiguration sc, boolean serverTest) { assertEquals("sname", sc.getName()); assertEquals("blah blah", sc.getDescription()); - assertEquals(3, sc.getSubjects().size()); + assertEquals(4, sc.getSubjects().size()); assertEquals("foo", sc.getSubjects().get(0)); assertEquals("bar", sc.getSubjects().get(1)); assertEquals("repub.>", sc.getSubjects().get(2)); - - assertSame(compressionOption, sc.getCompressionOption()); + assertEquals("st.>", sc.getSubjects().get(3)); assertSame(RetentionPolicy.Interest, sc.getRetentionPolicy()); assertEquals(730, sc.getMaxConsumers()); @@ -455,6 +455,12 @@ private void validate(StreamConfiguration sc, boolean serverTest, CompressionOpt assertEquals(1, sc.getMetadata().size()); assertEquals("meta-bar", sc.getMetadata().get("meta-foo")); assertEquals(82942, sc.getFirstSequence()); + + assertSame(S2, sc.getCompressionOption()); + + assertNotNull(sc.getSubjectTransform()); + assertEquals("st.>", sc.getSubjectTransform().getSource()); + assertEquals("stdest.>", sc.getSubjectTransform().getDestination()); } } @@ -492,15 +498,26 @@ public void testRepublish() { assertThrows(IllegalArgumentException.class, () -> Republish.builder().source("src.>").build()); assertThrows(IllegalArgumentException.class, () -> Republish.builder().destination("dest.>").build()); - Republish p = Republish.builder().source("src.>").destination("dest.>").build(); - assertEquals("src.>", p.getSource()); - assertEquals("dest.>", p.getDestination()); - assertFalse(p.isHeadersOnly()); + Republish r = Republish.builder().source("src.>").destination("dest.>").build(); + assertEquals("src.>", r.getSource()); + assertEquals("dest.>", r.getDestination()); + assertFalse(r.isHeadersOnly()); - p = Republish.builder().source("src.>").destination("dest.>").headersOnly(true).build(); - assertEquals("src.>", p.getSource()); - assertEquals("dest.>", p.getDestination()); - assertTrue(p.isHeadersOnly()); + r = Republish.builder().source("src.>").destination("dest.>").headersOnly(true).build(); + assertEquals("src.>", r.getSource()); + assertEquals("dest.>", r.getDestination()); + assertTrue(r.isHeadersOnly()); + } + + @Test + public void testSubjectTransform() { + SubjectTransform st = SubjectTransform.builder().source("src.>").destination("dest.>").build(); + assertEquals("src.>", st.getSource()); + assertEquals("dest.>", st.getDestination()); + + st = SubjectTransform.builder().build(); + assertNull(st.getSource()); + assertNull(st.getDestination()); } @Test diff --git a/src/test/resources/data/StreamConfiguration.json b/src/test/resources/data/StreamConfiguration.json index bb704ed43..98d2b3826 100644 --- a/src/test/resources/data/StreamConfiguration.json +++ b/src/test/resources/data/StreamConfiguration.json @@ -1,7 +1,7 @@ { "name": "sname", "description": "blah blah", - "subjects": ["foo", "bar", "repub.>"], + "subjects": ["foo", "bar", "repub.>", "st.>"], "retention": "interest", "compression": "s2", "max_consumers": 730, @@ -34,6 +34,10 @@ "dest": "dest.>", "headers_only": true }, + "subject_transform": { + "src": "st.>", + "dest": "stdest.>" + }, "mirror": { "name": "eman", "opt_start_seq": 736,