Skip to content

Commit

Permalink
kv mirror (#707)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 8, 2022
1 parent 5945c1b commit 33446a0
Show file tree
Hide file tree
Showing 21 changed files with 1,033 additions and 81 deletions.
5 changes: 5 additions & 0 deletions src/NATS.Client/Internals/JetStreamConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public static class JetStreamConstants
/// </summary>
public const string PrefixApiDot = "API.";

/// <summary>
/// The standard JetStream Prefix suffix without the dot at the end
/// </summary>
public const string PrefixApi = "API";

/// <summary>
/// The standard JetStream Prefix
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client/Internals/NatsConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ internal static class NatsConstants
internal const string SerializedHeaderCannotBeNullOrEmpty = "Serialized header cannot be null or empty.";

internal const string Empty = "";
internal const char Dot = '.';

internal const byte Sp = (byte)' ';
internal const byte Colon = (byte)':';
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ internal static class ApiConstants
internal const string Message = "message";
internal const string Messages = "messages";
internal const string Mirror = "mirror";
internal const string MirrorDirect = "mirror_direct";
internal const string Msgs = "msgs";
internal const string Name = "name";
internal const string NextBySubject = "next_by_subj";
Expand Down Expand Up @@ -170,6 +171,5 @@ internal static class ApiConstants
internal const string Total = "total";
internal const string Type = "type";
internal const string Version = "version";

}
}
13 changes: 10 additions & 3 deletions src/NATS.Client/JetStream/JetStreamOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

using NATS.Client.Internals;
using static NATS.Client.Internals.JetStreamConstants;
using static NATS.Client.Internals.NatsConstants;
using static NATS.Client.Internals.Validator;

namespace NATS.Client.JetStream
Expand Down Expand Up @@ -62,7 +63,13 @@ private JetStreamOptions(JetStreamOptionsBuilder b)
/// Gets whether the opt-out of the server v2.9.0 consumer create api is set
/// </summary>
public bool IsOptOut290ConsumerCreate { get; }


internal static string ConvertDomainToPrefix(string domain) {
string valid = ValidatePrefixOrDomain(domain, "Domain", false);
return valid == null ? null
: PrefixDollarJsDot + EnsureEndsWithDot(valid) + PrefixApi;
}

/// <summary>
/// Gets the JetStreamOptions builder.
/// </summary>
Expand Down Expand Up @@ -141,8 +148,8 @@ public JetStreamOptionsBuilder WithPrefix(string prefix)
/// <returns>The JetStreamOptionsBuilder</returns>
public JetStreamOptionsBuilder WithDomain(string domain)
{
string valid = ValidatePrefixOrDomain(domain, "Domain", false);
_jsPrefix = valid == null ? null : PrefixDollarJsDot + EnsureEndsWithDot(valid) + PrefixApiDot;
string prefix = ConvertDomainToPrefix(domain);
_jsPrefix = prefix == null ? null : prefix + Dot;
return this;
}

Expand Down
47 changes: 40 additions & 7 deletions src/NATS.Client/JetStream/Mirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ internal Mirror(JSONNode mirrorBaseNode)

internal override JSONNode ToJsonNode()
{
return new JSONObject
JSONObject o = new JSONObject();
JsonUtils.AddField(o, ApiConstants.Name, Name);
JsonUtils.AddField(o, ApiConstants.OptStartSeq, StartSeq);
JsonUtils.AddField(o, ApiConstants.OptStartTime, JsonUtils.ToString(StartTime));
JsonUtils.AddField(o, ApiConstants.FilterSubject, FilterSubject);
if (External != null)
{
[ApiConstants.Name] = Name,
[ApiConstants.OptStartSeq] = StartSeq,
[ApiConstants.OptStartTime] = JsonUtils.ToString(StartTime),
[ApiConstants.FilterSubject] = FilterSubject,
[ApiConstants.External] = External.ToJsonNode()
};
o[ApiConstants.External] = External.ToJsonNode();
}
return o;
}

internal static Mirror OptionalInstance(JSONNode mirrorNode)
Expand Down Expand Up @@ -97,6 +99,14 @@ public Mirror(string name, ulong startSeq, DateTime startTime, string filterSubj
public static MirrorBuilder Builder() {
return new MirrorBuilder();
}

/// <summary>
/// Creates a builder for a mirror object based on an existing mirror object.
/// </summary>
/// <returns>The Builder</returns>
public static MirrorBuilder Builder(Mirror mirror) {
return new MirrorBuilder(mirror);
}

/// <summary>
/// Mirror can be created using a MirrorBuilder.
Expand All @@ -109,6 +119,17 @@ public sealed class MirrorBuilder
private string _filterSubject;
private External _external;

public MirrorBuilder() { }

public MirrorBuilder(Mirror mirror)
{
_name = mirror.Name;
_startSeq = mirror.StartSeq;
_startTime = mirror.StartTime;
_filterSubject = mirror.FilterSubject;
_external = mirror.External;
}

/// <summary>
/// Set the mirror name.
/// </summary>
Expand Down Expand Up @@ -164,6 +185,18 @@ public MirrorBuilder WithExternal(External external)
return this;
}

/// <summary>
/// Set the external reference by using a domain based prefix.
/// </summary>
/// <param name="domain">the domain</param>
/// <returns>The Builder</returns>
public MirrorBuilder WithDomain(string domain)
{
string prefix = JetStreamOptions.ConvertDomainToPrefix(domain);
_external = prefix == null ? null : External.Builder().WithApi(prefix).Build();
return this;
}

/// <summary>
/// Build a Mirror object
/// </summary>
Expand Down
40 changes: 35 additions & 5 deletions src/NATS.Client/JetStream/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,14 @@ internal Source(JSONNode sourceBaseNode)
internal override JSONNode ToJsonNode()
{
JSONObject jso = new JSONObject();
jso[ApiConstants.Name] = Name;
jso[ApiConstants.OptStartSeq] = StartSeq;
jso[ApiConstants.OptStartTime] = JsonUtils.ToString(StartTime);
jso[ApiConstants.FilterSubject] = FilterSubject;
JsonUtils.AddField(jso, ApiConstants.Name, Name);
JsonUtils.AddField(jso, ApiConstants.OptStartSeq, StartSeq);
JsonUtils.AddField(jso, ApiConstants.OptStartTime, JsonUtils.ToString(StartTime));
JsonUtils.AddField(jso, ApiConstants.FilterSubject, FilterSubject);
if (External != null)
{
jso[ApiConstants.External] = External.ToJsonNode();
}

return jso;
}

Expand Down Expand Up @@ -112,6 +111,14 @@ public Source(string name, ulong startSeq, DateTime startTime, string filterSubj
return new SourceBuilder();
}

/// <summary>
/// Creates a builder for a source object based on an existing source object.
/// </summary>
/// <returns>The Builder</returns>
public static SourceBuilder Builder(Source source) {
return new SourceBuilder(source);
}

/// <summary>
/// Source can be created using a SourceBuilder.
/// </summary>
Expand All @@ -123,6 +130,17 @@ public sealed class SourceBuilder
private string _filterSubject;
private External _external;

public SourceBuilder() { }

public SourceBuilder(Source source)
{
_name = source.Name;
_startSeq = source.StartSeq;
_startTime = source.StartTime;
_filterSubject = source.FilterSubject;
_external = source.External;
}

/// <summary>
/// Set the source name.
/// </summary>
Expand Down Expand Up @@ -178,6 +196,18 @@ public SourceBuilder WithExternal(External external)
return this;
}

/// <summary>
/// Set the external reference by using a domain based prefix.
/// </summary>
/// <param name="domain">the domain</param>
/// <returns>The Builder</returns>
public SourceBuilder WithDomain(string domain)
{
string prefix = JetStreamOptions.ConvertDomainToPrefix(domain);
_external = prefix == null ? null : External.Builder().WithApi(prefix).Build();
return this;
}

/// <summary>
/// Build a Source object
/// </summary>
Expand Down
32 changes: 30 additions & 2 deletions src/NATS.Client/JetStream/StreamConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public sealed class StreamConfiguration : JsonSerializable
public bool Sealed { get; }
public bool AllowRollup { get; }
public bool AllowDirect { get; }
public bool MirrorDirect { get; }
public bool DenyDelete { get; }
public bool DenyPurge { get; }
public bool DiscardNewPerSubject { get; }
Expand Down Expand Up @@ -78,6 +79,7 @@ internal StreamConfiguration(JSONNode scNode)
Sealed = scNode[ApiConstants.Sealed].AsBool;
AllowRollup = scNode[ApiConstants.AllowRollupHdrs].AsBool;
AllowDirect = scNode[ApiConstants.AllowDirect].AsBool;
MirrorDirect = scNode[ApiConstants.MirrorDirect].AsBool;
DenyDelete = scNode[ApiConstants.DenyDelete].AsBool;
DenyPurge = scNode[ApiConstants.DenyPurge].AsBool;
DiscardNewPerSubject = scNode[ApiConstants.DiscardNewPerSubject].AsBool;
Expand Down Expand Up @@ -108,6 +110,7 @@ private StreamConfiguration(StreamConfigurationBuilder builder)
Sealed = builder._sealed;
AllowRollup = builder._allowRollup;
AllowDirect = builder._allowDirect;
MirrorDirect = builder._mirrorDirect;
DenyDelete = builder._denyDelete;
DenyPurge = builder._denyPurge;
DiscardNewPerSubject = builder._discardNewPerSubject;
Expand Down Expand Up @@ -148,6 +151,7 @@ internal override JSONNode ToJsonNode()
[ApiConstants.Sealed] = Sealed,
[ApiConstants.AllowRollupHdrs] = AllowRollup,
[ApiConstants.AllowDirect] = AllowDirect,
[ApiConstants.MirrorDirect] = MirrorDirect,
[ApiConstants.DenyDelete] = DenyDelete,
[ApiConstants.DenyPurge] = DenyPurge,
[ApiConstants.DiscardNewPerSubject] = DiscardNewPerSubject
Expand Down Expand Up @@ -189,6 +193,7 @@ public sealed class StreamConfigurationBuilder
internal bool _sealed;
internal bool _allowRollup;
internal bool _allowDirect;
internal bool _mirrorDirect;
internal bool _denyDelete;
internal bool _denyPurge;
internal bool _discardNewPerSubject;
Expand Down Expand Up @@ -221,6 +226,7 @@ public StreamConfigurationBuilder(StreamConfiguration sc)
_sealed = sc.Sealed;
_allowRollup = sc.AllowRollup;
_allowDirect = sc.AllowDirect;
_mirrorDirect = sc.MirrorDirect;
_denyDelete = sc.DenyDelete;
_denyPurge = sc.DenyPurge;
_discardNewPerSubject = sc.DiscardNewPerSubject;
Expand Down Expand Up @@ -536,6 +542,18 @@ public StreamConfigurationBuilder WithMaxAge(Duration maxAge)
return this;
}

/// <summary>
/// Add a source into the StreamConfiguration.
/// </summary>
/// <param name="source"></param>
/// <returns>The StreamConfigurationBuilder</returns>
public StreamConfigurationBuilder AddSource(Source source) {
if (source != null && !_sources.Contains(source)) {
_sources.Add(source);
}
return this;
}

/// <summary>
/// Sets the Allow Rollup mode of the StreamConfiguration.
/// </summary>
Expand All @@ -547,15 +565,25 @@ public StreamConfigurationBuilder WithMaxAge(Duration maxAge)
}

/// <summary>
/// Sets the Allow Direct mode of the StreamConfiguration.
/// Set whether to allow direct message access for a stream
/// </summary>
/// <param name="allowDirect">true to allow direct headers.</param>
/// <param name="allowDirect">the allow direct setting.</param>
/// <returns>The StreamConfigurationBuilder</returns>
public StreamConfigurationBuilder WithAllowDirect(bool allowDirect) {
_allowDirect = allowDirect;
return this;
}

/// <summary>
/// Set whether to allow unified direct access for mirrors
/// </summary>
/// <param name="mirrorDirect">the mirror direct setting.</param>
/// <returns>The StreamConfigurationBuilder</returns>
public StreamConfigurationBuilder WithMirrorDirect(bool mirrorDirect) {
_mirrorDirect = mirrorDirect;
return this;
}

/// <summary>
/// Sets the Deny Delete mode of the StreamConfiguration.
/// </summary>
Expand Down

0 comments on commit 33446a0

Please sign in to comment.