Skip to content

Commit

Permalink
Subject Transform Stream Configuration Part 1 (#978)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 21, 2023
1 parent 6c4a13a commit d28d9c9
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 28 deletions.
10 changes: 5 additions & 5 deletions src/main/java/io/nats/client/api/Republish.java
Expand Up @@ -23,7 +23,7 @@
import static io.nats.client.support.JsonValueUtils.readString;

/**
* Republish directives to consider
* Republish Configuration
*/
public class Republish implements JsonSerializable {
private final String source;
Expand All @@ -42,18 +42,20 @@ static Republish optionalInstance(JsonValue vRepublish) {

/**
* Construct a 'republish' object
* @param source the Published Subject-matching filter
* @param source the Published subject matching filter
* @param destination the RePublish Subject template
* @param headersOnly Whether to RePublish only headers (no body)
*/
public Republish(String source, String destination, boolean headersOnly) {
Validator.required(source, "Source");
Validator.required(destination, "Destination");
this.source = source;
this.destination = destination;
this.headersOnly = headersOnly;
}

/**
* Get source, the Published Subject-matching filter
* Get source, the Published subject matching filter
* @return the source
*/
public String getSource() {
Expand Down Expand Up @@ -134,8 +136,6 @@ public Builder headersOnly(Boolean headersOnly) {
* @return the Placement
*/
public Republish build() {
Validator.required(source, "Source");
Validator.required(destination, "Destination");
return new Republish(source, destination, headersOnly);
}
}
Expand Down
30 changes: 28 additions & 2 deletions src/main/java/io/nats/client/api/StreamConfiguration.java
Expand Up @@ -56,6 +56,7 @@ public class StreamConfiguration implements JsonSerializable {
private final Duration duplicateWindow;
private final Placement placement;
private final Republish republish;
private final SubjectTransform subjectTransform;
private final Mirror mirror;
private final List<Source> sources;
private final boolean sealed;
Expand Down Expand Up @@ -89,6 +90,7 @@ static StreamConfiguration instance(JsonValue v) {
builder.subjects(readStringList(v, SUBJECTS));
builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT)));
builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH)));
builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)));
builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR)));
builder.sources(Source.optionalListOf(readValue(v, SOURCES)));
builder.sealed(readBoolean(v, SEALED));
Expand Down Expand Up @@ -124,6 +126,7 @@ static StreamConfiguration instance(JsonValue v) {
this.duplicateWindow = b.duplicateWindow;
this.placement = b.placement;
this.republish = b.republish;
this.subjectTransform = b.subjectTransform;
this.mirror = b.mirror;
this.sources = b.sources;
this.sealed = b.sealed;
Expand Down Expand Up @@ -171,6 +174,9 @@ public String toJson() {
if (republish != null) {
addField(sb, REPUBLISH, republish);
}
if (subjectTransform != null) {
addField(sb, SUBJECT_TRANSFORM, subjectTransform);
}
if (mirror != null) {
addField(sb, MIRROR, mirror);
}
Expand Down Expand Up @@ -343,6 +349,14 @@ public Republish getRepublish() {
return republish;
}

/**
* Get the subjectTransform configuration. May be null.
* @return the subjectTransform object
*/
public SubjectTransform getSubjectTransform() {
return subjectTransform;
}

/**
* The mirror definition for this stream
* @return the mirror
Expand Down Expand Up @@ -511,6 +525,7 @@ public static class Builder {
private Duration duplicateWindow = Duration.ZERO;
private Placement placement = null;
private Republish republish = null;
private SubjectTransform subjectTransform = null;
private Mirror mirror = null;
private final List<Source> sources = new ArrayList<>();
private boolean sealed = false;
Expand Down Expand Up @@ -553,6 +568,7 @@ public Builder(StreamConfiguration sc) {
this.duplicateWindow = sc.duplicateWindow;
this.placement = sc.placement;
this.republish = sc.republish;
this.subjectTransform = sc.subjectTransform;
this.mirror = sc.mirror;
sources(sc.sources);
this.sealed = sc.sealed;
Expand Down Expand Up @@ -812,15 +828,25 @@ public Builder placement(Placement placement) {
}

/**
* Sets the republish directive object
* @param republish the republish directive object
* Sets the republish config object
* @param republish the republish config object
* @return Builder
*/
public Builder republish(Republish republish) {
this.republish = republish;
return this;
}

/**
* Sets the subjectTransform config object
* @param subjectTransform the subjectTransform config object
* @return Builder
*/
public Builder subjectTransform(SubjectTransform subjectTransform) {
this.subjectTransform = subjectTransform;
return this;
}

/**
* Sets the mirror object
* @param mirror the mirror object
Expand Down
115 changes: 115 additions & 0 deletions src/main/java/io/nats/client/api/SubjectTransform.java
@@ -0,0 +1,115 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonValue;

import static io.nats.client.support.ApiConstants.DEST;
import static io.nats.client.support.ApiConstants.SRC;
import static io.nats.client.support.JsonUtils.*;
import static io.nats.client.support.JsonValueUtils.readString;

/**
* SubjectTransform
*/
public class SubjectTransform implements JsonSerializable {
private final String source;
private final String destination;

static SubjectTransform optionalInstance(JsonValue vSubjectTransform) {
return vSubjectTransform == null ? null : new SubjectTransform(vSubjectTransform);
}

SubjectTransform(JsonValue vSubjectTransform) {
source = readString(vSubjectTransform, SRC);
destination = readString(vSubjectTransform, DEST);
}

/**
* Construct a 'SubjectTransform' object
* @param source the subject matching filter
* @param destination the SubjectTransform Subject template
*/
public SubjectTransform(String source, String destination) {
this.source = source;
this.destination = destination;
}

/**
* Get source, the subject matching filter
* @return the source
*/
public String getSource() {
return source;
}

/**
* Get destination, the SubjectTransform Subject template
* @return the destination
*/
public String getDestination() {
return destination;
}

public String toJson() {
StringBuilder sb = beginJson();
addField(sb, SRC, source);
addField(sb, DEST, destination);
return endJson(sb).toString();
}

/**
* Creates a builder for a placements object.
* @return the builder.
*/
public static Builder builder() {
return new Builder();
}

/**
* Placement can be created using a Builder.
*/
public static class Builder {
private String source;
private String destination;

/**
* Set the Published Subject-matching filter
* @param source the source
* @return the builder
*/
public Builder source(String source) {
this.source = source;
return this;
}
/**
* Set the SubjectTransform Subject template
* @param destination the destination
* @return the builder
*/
public Builder destination(String destination) {
this.destination = destination;
return this;
}

/**
* Build a Placement object
* @return the Placement
*/
public SubjectTransform build() {
return new SubjectTransform(source, destination);
}
}
}
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Expand Up @@ -178,6 +178,7 @@ public interface ApiConstants {
String STREAM = "stream";
String STREAMS = "streams";
String SUBJECT = "subject";
String SUBJECT_TRANSFORM = "subject_transform";
String SUBJECTS = "subjects";
String SUBJECTS_FILTER = "subjects_filter";
String SUCCESS = "success";
Expand Down
57 changes: 37 additions & 20 deletions src/test/java/io/nats/client/api/StreamConfigurationTests.java
Expand Up @@ -55,21 +55,21 @@ public void testRoundTrip() throws Exception {
.compressionOption(compressionOption)
.build();
JetStreamManagement jsm = nc.jetStreamManagement();
validate(jsm.addStream(sc).getConfiguration(), true, compressionOption);
validate(jsm.addStream(sc).getConfiguration(), true);
});
}

@Test
public void testConstruction() {
StreamConfiguration testSc = getTestConfiguration();
// from json
validate(testSc, false, S2);
validate(testSc, false);

// test toJson
validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false, S2);
validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false);

// copy constructor
validate(StreamConfiguration.builder(testSc).build(), false, S2);
validate(StreamConfiguration.builder(testSc).build(), false);

Map<String, String> metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar");

Expand All @@ -94,6 +94,7 @@ public void testConstruction() {
.duplicateWindow(testSc.getDuplicateWindow())
.placement(testSc.getPlacement())
.republish(testSc.getRepublish())
.subjectTransform(testSc.getSubjectTransform())
.mirror(testSc.getMirror())
.sources(testSc.getSources())
.sealed(testSc.getSealed())
Expand All @@ -105,14 +106,14 @@ public void testConstruction() {
.discardNewPerSubject(testSc.isDiscardNewPerSubject())
.metadata(metaData)
.firstSequence(82942);
validate(builder.build(), false, S2);
validate(builder.addSources((Source)null).build(), false, S2);
validate(builder.build(), false);
validate(builder.addSources((Source)null).build(), false);

List<Source> sources = new ArrayList<>(testSc.getSources());
sources.add(null);
Source copy = new Source(JsonParser.parseUnchecked(sources.get(0).toJson()));
sources.add(copy);
validate(builder.addSources(sources).build(), false, S2);
validate(builder.addSources(sources).build(), false);

// covering add a single source
sources = new ArrayList<>(testSc.getSources());
Expand All @@ -122,7 +123,7 @@ public void testConstruction() {
builder.addSource(source);
}
builder.addSource(sources.get(0));
validate(builder.build(), false, S2);
validate(builder.build(), false);

// equals and hashcode coverage
External external = copy.getExternal();
Expand Down Expand Up @@ -386,15 +387,14 @@ public void testDiscardPolicy() {
assertEquals(DiscardPolicy.Old, builder.build().getDiscardPolicy());
}

private void validate(StreamConfiguration sc, boolean serverTest, CompressionOption compressionOption) {
private void validate(StreamConfiguration sc, boolean serverTest) {
assertEquals("sname", sc.getName());
assertEquals("blah blah", sc.getDescription());
assertEquals(3, sc.getSubjects().size());
assertEquals(4, sc.getSubjects().size());
assertEquals("foo", sc.getSubjects().get(0));
assertEquals("bar", sc.getSubjects().get(1));
assertEquals("repub.>", sc.getSubjects().get(2));

assertSame(compressionOption, sc.getCompressionOption());
assertEquals("st.>", sc.getSubjects().get(3));

assertSame(RetentionPolicy.Interest, sc.getRetentionPolicy());
assertEquals(730, sc.getMaxConsumers());
Expand Down Expand Up @@ -455,6 +455,12 @@ private void validate(StreamConfiguration sc, boolean serverTest, CompressionOpt
assertEquals(1, sc.getMetadata().size());
assertEquals("meta-bar", sc.getMetadata().get("meta-foo"));
assertEquals(82942, sc.getFirstSequence());

assertSame(S2, sc.getCompressionOption());

assertNotNull(sc.getSubjectTransform());
assertEquals("st.>", sc.getSubjectTransform().getSource());
assertEquals("stdest.>", sc.getSubjectTransform().getDestination());
}
}

Expand Down Expand Up @@ -492,15 +498,26 @@ public void testRepublish() {
assertThrows(IllegalArgumentException.class, () -> Republish.builder().source("src.>").build());
assertThrows(IllegalArgumentException.class, () -> Republish.builder().destination("dest.>").build());

Republish p = Republish.builder().source("src.>").destination("dest.>").build();
assertEquals("src.>", p.getSource());
assertEquals("dest.>", p.getDestination());
assertFalse(p.isHeadersOnly());
Republish r = Republish.builder().source("src.>").destination("dest.>").build();
assertEquals("src.>", r.getSource());
assertEquals("dest.>", r.getDestination());
assertFalse(r.isHeadersOnly());

p = Republish.builder().source("src.>").destination("dest.>").headersOnly(true).build();
assertEquals("src.>", p.getSource());
assertEquals("dest.>", p.getDestination());
assertTrue(p.isHeadersOnly());
r = Republish.builder().source("src.>").destination("dest.>").headersOnly(true).build();
assertEquals("src.>", r.getSource());
assertEquals("dest.>", r.getDestination());
assertTrue(r.isHeadersOnly());
}

@Test
public void testSubjectTransform() {
SubjectTransform st = SubjectTransform.builder().source("src.>").destination("dest.>").build();
assertEquals("src.>", st.getSource());
assertEquals("dest.>", st.getDestination());

st = SubjectTransform.builder().build();
assertNull(st.getSource());
assertNull(st.getDestination());
}

@Test
Expand Down

0 comments on commit d28d9c9

Please sign in to comment.