Skip to content

Commit

Permalink
Ability to opt-out of using Server v2.9.0 consumer create api (#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 19, 2022
1 parent 7056db1 commit 5fc9738
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 47 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ Doxygen is required to be installed and in the PATH. Version 1.8 is known to wo

[Current API Documentation](http://nats-io.github.io/nats.net)

## Version Notes

### Version 1.0.1 Consumer Create

This release by default will use a new JetStream consumer create API when interacting with nats-server version 2.9.0 or higher.
This changes the subjects used by the client to create consumers, which might in some cases require changes in access and import/export configuration.
The developer can opt out of using this feature by using a custom JetStreamOptions and using it when creating
JetStream, Key Value and Object Store regular and management contexts.

```csharp
JetStreamOptions jso = JetStreamOptions.Builder().WithOptOut290ConsumerCreate(true).Build();

IJetStream js = connection.CreateJetStreamContext(jso);
IJetStreamManagement jsm = connection.CreateJetStreamManagementContext(jso);
IKeyValue kv = connection.CreateKeyValueContext("bucket", KeyValueOptions.Builder(jso).Build());
IKeyValueManagement kvm = connection.CreateKeyValueManagementContext(KeyValueOptions.Builder(jso).Build());
IObjectStore os = connection.CreateObjectStoreContext("bucket", ObjectStoreOptions.Builder(jso).Build());
IObjectStoreManagement osm = connection.CreateObjectStoreManagementContext(ObjectStoreOptions.Builder(jso).Build());
```

## Basic Usage

NATS .NET C# Client uses interfaces to reference most NATS client objects, and delegates for all types of events.
Expand Down Expand Up @@ -984,7 +1004,7 @@ Subscription creation has many checks to make sure that a valid, operable subscr
| OsGetChunksMismatch | OS | 90206 | Number of chunks does not match meta data. |
| OsGetSizeMismatch | OS | 90207 | Total size does not match meta data. |
| OsGetLinkToBucket | OS | 90208 | Cannot get object, it is a link to a bucket. |
| JsConsumerCantUseNameBefore290 | CON | 90301 | Name field not valid against pre v2.9.0 servers. |
| JsConsumerCreate290NotAvailable | CON | 90301 | Name field not valid when v2.9.0 consumer create api is not available. |
| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. |

### Message Acknowledgements
Expand Down
2 changes: 1 addition & 1 deletion az-templates/stage-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ stages:
jobs:
- job: BuildArtifacts
displayName: 'Builds, tests & produces artifacts'
timeoutInMinutes: 20
timeoutInMinutes: 60
cancelTimeoutInMinutes: 5
steps:
- task: NugetToolInstaller@1
Expand Down
2 changes: 1 addition & 1 deletion documentation/DoxyFile.NATS.Client
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ PROJECT_NAME = "NATS .NET Client"
# could be handy for archiving the generated documentation or if some version
# control system is used.

PROJECT_NUMBER = 1.0.0
PROJECT_NUMBER = 1.0.1

# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
Expand Down
32 changes: 32 additions & 0 deletions documentation/doc_build_instructions.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
: IMPORTANT!
: A. Start in directory to that is parent directory to nats.net
: B. Make sure you have doxygen installed and in your path. https://doxygen.nl/index.html
: C. Edit DoxyFile.NATS.Client, line 41 to have the version you want.
: D. Use the version for the commit message in step 7.

: Step 1. From the doc directory, build the docs, then go back to parent
pushd nats.net\documentation
doxygen DoxyFile.NATS.Client
popd

: Step 2. From the parent directory clone the repo
rd nats.net-gh-pages /S /Q
git clone https://github.com/nats-io/nats.net nats.net-gh-pages

: Step 3. Go into the the repo directory
cd nats.net-gh-pages

: Step 4. switch the the gh-pages branch
git switch gh-pages

: Step 5. delete everything that is in there
rd search /S /Q
del *.* /Q

: Step 6. Copy the generated docs
xcopy ..\nats.net\documentation\NATS.Client\html /S

: Step 7. git add, commit and push
git add -A
git commit -m "Docs for 1.0.0"
git push origin gh-pages
5 changes: 4 additions & 1 deletion src/NATS.Client/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public sealed class ClientExDetail
public static readonly ClientExDetail OsGetSizeMismatch = new ClientExDetail(Os, 90207, "Total size does not match meta data.");
public static readonly ClientExDetail OsGetLinkToBucket = new ClientExDetail(Os, 90208, "Cannot get object, it is a link to a bucket.");

public static readonly ClientExDetail JsConsumerCantUseNameBefore290 = new ClientExDetail(Con, 90301, "Name field not valid against pre v2.9.0 servers.");
public static readonly ClientExDetail JsConsumerCreate290NotAvailable = new ClientExDetail(Con, 90301, "Name field not valid when v2.9.0 consumer create api is not available.");
public static readonly ClientExDetail JsConsumerNameDurableMismatch = new ClientExDetail(Con, 90302, "Name must match durable if both are supplied.");

private const string Sub = "SUB";
Expand Down Expand Up @@ -302,5 +302,8 @@ internal NATSJetStreamClientException Instance(string extraMessage)

[Obsolete("constant name had typo, replaced with JsSoDeliverSubjectMismatch")]
public static readonly ClientExDetail JsSoDeliverSubjectGroupMismatch = new ClientExDetail(So, 90103, "Builder deliver subject must match the consumer configuration deliver subject if both are provided.");

[Obsolete("replaced with more comprehensive name, replaced with JsConsumerCreate290NotAvailable")]
public static readonly ClientExDetail JsConsumerCantUseNameBefore290 = new ClientExDetail(Con, 90301, "Name field not valid against pre v2.9.0 servers.");
}
}
10 changes: 5 additions & 5 deletions src/NATS.Client/JetStream/JetStreamBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ internal static ServerInfo ServerInfoOrException(IConnection conn)

internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerConfiguration config)
{
bool serverIs290OrLater = ServerInfoOrException(Conn).IsSameOrNewerThanVersion("2.9.0");
bool consumerCreate290Available = ServerInfoOrException(Conn).IsSameOrNewerThanVersion("2.9.0") && !JetStreamOptions.IsOptOut290ConsumerCreate;

string name = Validator.EmptyAsNull(config.Name);
if (!string.IsNullOrWhiteSpace(name) && !serverIs290OrLater)
if (!string.IsNullOrWhiteSpace(name) && !consumerCreate290Available)
{
throw ClientExDetail.JsConsumerCantUseNameBefore290.Instance();
throw ClientExDetail.JsConsumerCreate290NotAvailable.Instance();
}

string durable = Validator.EmptyAsNull(config.Durable);
Expand All @@ -87,7 +87,7 @@ internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerCon
{
subj = string.Format(JetStreamConstants.JsapiConsumerCreate, streamName);
}
else if (serverIs290OrLater)
else if (consumerCreate290Available)
{
string fs = Validator.EmptyAsNull(config.FilterSubject);
if (fs == null || fs.Equals(">"))
Expand All @@ -99,7 +99,7 @@ internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerCon
subj = string.Format(JetStreamConstants.JsapiConsumerCreateV290WithFilter, streamName, consumerName, fs);
}
}
else // server is old and consumerName must be durable since name was checked for JsConsumerCantUseNameBefore290
else // server is old and consumerName must be durable since name was checked for JsConsumerCreate290NotAvailable
{
subj = string.Format(JetStreamConstants.JsapiDurableCreate, streamName, durable);
}
Expand Down
36 changes: 27 additions & 9 deletions src/NATS.Client/JetStream/JetStreamOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ public sealed class JetStreamOptions
public static readonly Duration DefaultTimeout = Duration.OfMillis(Defaults.Timeout);
public static readonly JetStreamOptions DefaultJsOptions = Builder().Build();

private JetStreamOptions(string inJsPrefix, Duration requestTimeout, bool publishNoAck)
private JetStreamOptions(JetStreamOptionsBuilder b)
{
if (inJsPrefix == null) {
if (b._jsPrefix == null) {
IsDefaultPrefix = true;
Prefix = DefaultApiPrefix;
}
else {
IsDefaultPrefix = false;
Prefix = inJsPrefix;
Prefix = b._jsPrefix;
}

RequestTimeout = requestTimeout;
IsPublishNoAck = publishNoAck;
RequestTimeout = b._requestTimeout;
IsPublishNoAck = b._publishNoAck;
IsOptOut290ConsumerCreate = b._optOut290ConsumerCreate;
}

/// <summary>
Expand All @@ -57,6 +58,11 @@ private JetStreamOptions(string inJsPrefix, Duration requestTimeout, bool publis
/// </summary>
public bool IsDefaultPrefix { get; }

/// <summary>
/// Gets whether the opt-out of the server v2.9.0 consumer create api is set
/// </summary>
public bool IsOptOut290ConsumerCreate { get; }

/// <summary>
/// Gets the JetStreamOptions builder.
/// </summary>
Expand All @@ -80,9 +86,10 @@ public static JetStreamOptionsBuilder Builder(JetStreamOptions jso)

public sealed class JetStreamOptionsBuilder
{
private string _jsPrefix;
private Duration _requestTimeout = DefaultTimeout;
private bool _publishNoAck;
internal string _jsPrefix;
internal Duration _requestTimeout = DefaultTimeout;
internal bool _publishNoAck;
internal bool _optOut290ConsumerCreate;

/// <summary>
/// Construct a builder
Expand All @@ -108,6 +115,7 @@ public JetStreamOptionsBuilder(JetStreamOptions jso)

_requestTimeout = jso.RequestTimeout;
_publishNoAck = jso.IsPublishNoAck;
_optOut290ConsumerCreate = jso.IsOptOut290ConsumerCreate;
}
}

Expand Down Expand Up @@ -170,14 +178,24 @@ public JetStreamOptionsBuilder WithPublishNoAck(bool publishNoAck)
return this;
}

/// <summary>
/// Sets whether to opt-out of the server v2.9.0 consumer create api. Default is false (opt-in)
/// </summary>
/// <param name="optOut"></param>
/// <returns>The JetStreamOptionsBuilder</returns>
public JetStreamOptionsBuilder WithOptOut290ConsumerCreate(bool optOut) {
this._optOut290ConsumerCreate = optOut;
return this;
}

/// <summary>
/// Builds the JetStreamOptions
/// </summary>
/// <returns>The JetStreamOptions object.</returns>
public JetStreamOptions Build()
{
_requestTimeout = _requestTimeout ?? DefaultTimeout;
return new JetStreamOptions(_jsPrefix, _requestTimeout, _publishNoAck);
return new JetStreamOptions(this);
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/Tests/IntegrationTests/TestJetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ public void TestConnectionClosing()
Assert.Equal(Name(2), ci.Name);
Assert.Equal(Name(2), ci.ConsumerConfiguration.Name);
Assert.Equal(Name(2), ci.ConsumerConfiguration.Durable);
// test opt out
JetStreamOptions jso = JetStreamOptions.Builder().WithOptOut290ConsumerCreate(true).Build();
IJetStream jsOptOut = c.CreateJetStreamContext(jso);
ConsumerConfiguration ccOptOut = ConsumerConfiguration.Builder().WithName(Name(99)).Build();
PushSubscribeOptions psoOptOut = PushSubscribeOptions.Builder().WithConfiguration(ccOptOut).Build();
NATSJetStreamClientException e = Assert.Throws<NATSJetStreamClientException>(() => jsOptOut.PushSubscribeSync(SUBJECT, psoOptOut));
Assert.Contains(JsConsumerCreate290NotAvailable.Id, e.Message);
}
});
}
Expand Down
21 changes: 5 additions & 16 deletions src/Tests/IntegrationTestsInternal/TestOrderedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@ namespace IntegrationTestsInternal
{
public class TestOrderedConsumer : TestSuite<JetStreamSuiteContext>
{
private readonly ITestOutputHelper output;

public TestOrderedConsumer(ITestOutputHelper output, JetStreamSuiteContext context) : base(context)
{
this.output = output;
}
public TestOrderedConsumer(JetStreamSuiteContext context) : base(context) { }

// ------------------------------------------------------------------------------------------
// this allows me to intercept messages before it gets to the connection queue
Expand Down Expand Up @@ -67,8 +62,6 @@ protected override Msg BeforeChannelAddCheck(Msg msg)
[Fact]
public void TestOrderedConsumerSync()
{
Console.SetOut(new ConsoleWriter(output));

Context.RunInJsServer(c =>
{
// Setup
Expand Down Expand Up @@ -113,8 +106,6 @@ public void TestOrderedConsumerSync()
[Fact]
public void TestOrderedConsumerAsync()
{
Console.SetOut(new ConsoleWriter(output));

Context.RunInJsServer(c =>
{
// Setup
Expand Down Expand Up @@ -156,15 +147,13 @@ void TestHandler(object sender, MsgHandlerEventArgs args)
JsPublish(js, subject, 101, 6);
// wait for the messages
latch.Wait();
latch.Wait(TimeSpan.FromMinutes(1));
// Loop through the messages to make sure I get stream sequence 1 to 6
ulong expectedStreamSeq = 1;
while (expectedStreamSeq <= 6) {
int idx = (int)expectedStreamSeq - 1;
Assert.Equal(expectedStreamSeq, (ulong)ssFlags[idx].Read());
for (int idx = 0; idx < 6; idx++)
{
Assert.Equal((ulong)idx + 1, (ulong)ssFlags[idx].Read());
Assert.Equal(ExpectedConSeqNums[idx], (ulong)csFlags[idx].Read());
++expectedStreamSeq;
}
});
}
Expand Down
49 changes: 36 additions & 13 deletions src/Tests/UnitTests/JetStream/TestJetStreamOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,65 @@ namespace UnitTests.JetStream
public class TestJetStreamOptions : TestBase
{
[Fact]
public void TestPushAffirmative()
public void TestBuilder()
{
// default
JetStreamOptions jso = JetStreamOptions.Builder().Build();
Assert.Equal(DefaultApiPrefix, jso.Prefix);
Assert.Equal(Duration.OfMillis(Defaults.Timeout), jso.RequestTimeout);
Assert.Equal(DefaultApiPrefix, jso.Prefix);
Assert.True(jso.IsDefaultPrefix);
Assert.False(jso.IsPublishNoAck);
Assert.False(jso.IsOptOut290ConsumerCreate);

// default copy
jso = JetStreamOptions.Builder(jso).Build();
Assert.Equal(DefaultApiPrefix, jso.Prefix);
Assert.Equal(Duration.OfMillis(Defaults.Timeout), jso.RequestTimeout);
Assert.Equal(DefaultApiPrefix, jso.Prefix);
Assert.True(jso.IsDefaultPrefix);
Assert.False(jso.IsPublishNoAck);
Assert.False(jso.IsOptOut290ConsumerCreate);

// affirmative
jso = JetStreamOptions.Builder()
.WithPrefix("pre")
.WithRequestTimeout(Duration.OfSeconds(42))
.WithPublishNoAck(true)
.WithOptOut290ConsumerCreate(true)
.Build();
Assert.Equal("pre.", jso.Prefix);
Assert.Equal(Duration.OfSeconds(42), jso.RequestTimeout);
Assert.False(jso.IsPublishNoAck);
Assert.Equal("pre.", jso.Prefix);
Assert.False(jso.IsDefaultPrefix);
Assert.True(jso.IsPublishNoAck);
Assert.True(jso.IsOptOut290ConsumerCreate);

// affirmative copy
jso = JetStreamOptions.Builder(jso).Build();
Assert.Equal("pre.", jso.Prefix);
Assert.Equal(Duration.OfSeconds(42), jso.RequestTimeout);
Assert.False(jso.IsPublishNoAck);

Assert.Equal("pre.", jso.Prefix);
Assert.False(jso.IsDefaultPrefix);
Assert.True(jso.IsPublishNoAck);
Assert.True(jso.IsOptOut290ConsumerCreate);

// variations / coverage
jso = JetStreamOptions.Builder()
.WithPrefix("pre.")
.WithPublishNoAck(true)
.WithRequestTimeout(42000)
.WithPublishNoAck(false)
.WithOptOut290ConsumerCreate(false)
.Build();
Assert.Equal("pre.", jso.Prefix);
Assert.Equal(Duration.OfSeconds(42), jso.RequestTimeout);
Assert.True(jso.IsPublishNoAck);
Assert.False(jso.IsDefaultPrefix);
Assert.Equal("pre.", jso.Prefix);
Assert.False(jso.IsPublishNoAck);
Assert.False(jso.IsOptOut290ConsumerCreate);

// variations / coverage copy
jso = JetStreamOptions.Builder(jso).Build();
Assert.Equal("pre.", jso.Prefix);
Assert.Equal(Duration.OfSeconds(42), jso.RequestTimeout);
Assert.True(jso.IsPublishNoAck);
Assert.False(jso.IsDefaultPrefix);
Assert.Equal("pre.", jso.Prefix);
Assert.False(jso.IsPublishNoAck);
Assert.False(jso.IsOptOut290ConsumerCreate);
}

[Fact]
Expand Down

0 comments on commit 5fc9738

Please sign in to comment.