Skip to content

Commit

Permalink
List materialized views (#698)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Oct 5, 2022
1 parent 686c31e commit a5d45ca
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/ListReaders.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The NATS Authors
// Copyright 2021-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
15 changes: 15 additions & 0 deletions src/NATS.Client/KeyValue/IKeyValueManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;

namespace NATS.Client.KeyValue
Expand Down Expand Up @@ -42,8 +43,22 @@ public interface IKeyValueManagement
/// </summary>
/// <param name="bucketName">the bucket name to use</param>
/// <returns>the bucket status object</returns>
[Obsolete("This method will soon be deprecated. Use GetStatus instead.")]
KeyValueStatus GetBucketInfo(string bucketName);

/// <summary>
/// Gets the status for an existing bucket.
/// </summary>
/// <param name="bucketName">the bucket name to use</param>
/// <returns>the bucket status object</returns>
KeyValueStatus GetStatus(string bucketName);

/// <summary>
/// Gets the status for all buckets.
/// </summary>
/// <returns>the list of statuses</returns>
IList<KeyValueStatus> GetStatuses();

/// <summary>
/// Deletes an existing bucket. Will throw a NATSJetStreamException if the delete fails.
/// </summary>
Expand Down
23 changes: 18 additions & 5 deletions src/NATS.Client/KeyValue/KeyValueManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using NATS.Client.Internals;
using NATS.Client.JetStream;
using static NATS.Client.JetStream.JetStreamBase;
using static NATS.Client.KeyValue.KeyValueUtil;

namespace NATS.Client.KeyValue
{
Expand Down Expand Up @@ -47,23 +48,35 @@ public IList<string> GetBucketNames()
IList<string> buckets = new List<string>();
IList<string> names = jsm.GetStreamNames();
foreach (string name in names) {
if (name.StartsWith(KeyValueUtil.KvStreamPrefix)) {
buckets.Add(KeyValueUtil.ExtractBucketName(name));
if (name.StartsWith(KvStreamPrefix)) {
buckets.Add(ExtractBucketName(name));
}
}
return buckets;
}

public KeyValueStatus GetBucketInfo(string bucketName)
public KeyValueStatus GetBucketInfo(string bucketName) => GetStatus(bucketName);

public KeyValueStatus GetStatus(string bucketName)
{
Validator.ValidateBucketName(bucketName, true);
return new KeyValueStatus(jsm.GetStreamInfo(KeyValueUtil.ToStreamName(bucketName)));
return new KeyValueStatus(jsm.GetStreamInfo(ToStreamName(bucketName)));
}

public IList<KeyValueStatus> GetStatuses()
{
IList<string> bucketNames = GetBucketNames();
IList<KeyValueStatus> statuses = new List<KeyValueStatus>();
foreach (string name in bucketNames) {
statuses.Add(new KeyValueStatus(jsm.GetStreamInfo(ToStreamName(name))));
}
return statuses;
}

public void Delete(string bucketName)
{
Validator.ValidateBucketName(bucketName, true);
jsm.DeleteStream(KeyValueUtil.ToStreamName(bucketName));
jsm.DeleteStream(ToStreamName(bucketName));
}
}
}
10 changes: 8 additions & 2 deletions src/NATS.Client/ObjectStore/IObjectStoreManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ public interface IObjectStoreManagement
IList<String> GetBucketNames();

/// <summary>
/// Gets the info for an existing object store.
/// Gets the status for an existing object store.
/// OBJECT STORE IMPLEMENTATION IS EXPERIMENTAL AND SUBJECT TO CHANGE.
/// </summary>
/// <param name="bucketName">the object store bucket name to get info for</param>
ObjectStoreStatus GetStatus(String bucketName);
ObjectStoreStatus GetStatus(string bucketName);

/// <summary>
/// Gets the status for all object store buckets.
/// </summary>
/// <returns>the bucket status object</returns>
IList<ObjectStoreStatus> GetStatuses();

/// <summary>
/// Deletes an existing object store. Will throw a JetStreamApiException if the delete fails.
Expand Down
19 changes: 15 additions & 4 deletions src/NATS.Client/ObjectStore/ObjectStoreManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.IO;
using NATS.Client.Internals;
using NATS.Client.JetStream;
using static NATS.Client.ObjectStore.ObjectStoreUtil;

namespace NATS.Client.ObjectStore
{
Expand Down Expand Up @@ -30,8 +31,8 @@ public IList<string> GetBucketNames()
IList<string> buckets = new List<string>();
IList<string> names = jsm.GetStreamNames();
foreach (string name in names) {
if (name.StartsWith(ObjectStoreUtil.ObjStreamPrefix)) {
buckets.Add(ObjectStoreUtil.ExtractBucketName(name));
if (name.StartsWith(ObjStreamPrefix)) {
buckets.Add(ExtractBucketName(name));
}
}
return buckets;
Expand All @@ -40,13 +41,23 @@ public IList<string> GetBucketNames()
public ObjectStoreStatus GetStatus(string bucketName)
{
Validator.ValidateBucketName(bucketName, true);
return new ObjectStoreStatus(jsm.GetStreamInfo(ObjectStoreUtil.ToStreamName(bucketName)));
return new ObjectStoreStatus(jsm.GetStreamInfo(ToStreamName(bucketName)));
}

public IList<ObjectStoreStatus> GetStatuses()
{
IList<string> bucketNames = GetBucketNames();
IList<ObjectStoreStatus> statuses = new List<ObjectStoreStatus>();
foreach (string name in bucketNames) {
statuses.Add(new ObjectStoreStatus(jsm.GetStreamInfo(ToStreamName(name))));
}
return statuses;
}

public void Delete(string bucketName)
{
Validator.ValidateBucketName(bucketName, true);
jsm.DeleteStream(ObjectStoreUtil.ToStreamName(bucketName));
jsm.DeleteStream(ToStreamName(bucketName));
}
}
}
4 changes: 2 additions & 2 deletions src/Samples/KeyValueFull/KeyValueFull.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static void Main(string[] args)

// let's check the bucket info
Console.WriteLine("\n9.1 Bucket before update/delete");
kvs = kvm.GetBucketInfo(helper.Bucket);
kvs = kvm.GetStatus(helper.Bucket);
Console.WriteLine(kvs);

kvc = KeyValueConfiguration.Builder(kvs.Config)
Expand All @@ -178,7 +178,7 @@ public static void Main(string[] args)
kvm.Delete(helper.Bucket);

try {
kvm.GetBucketInfo(helper.Bucket);
kvm.GetStatus(helper.Bucket);
Console.WriteLine("UH OH! Bucket should not have been found!");
}
catch (NATSJetStreamException) {
Expand Down
51 changes: 30 additions & 21 deletions src/Tests/IntegrationTests/TestKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
using NATS.Client.JetStream;
using NATS.Client.KeyValue;
using Xunit;
using Xunit.Abstractions;
using static UnitTests.TestBase;
using static IntegrationTests.JetStreamTestBase;
using static NATS.Client.JetStream.JetStreamOptions;
Expand Down Expand Up @@ -128,7 +127,7 @@ public void TestWorkFlow()
AssertHistory(longHistory, kv.History(longKey));
// let's check the bucket info
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 3, 3);
// delete a key. Its entry will still exist, but it's value is null
Expand All @@ -138,7 +137,7 @@ public void TestWorkFlow()
AssertHistory(byteHistory, kv.History(byteKey));
// let's check the bucket info
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 4, 4);
// if the key does not exist (no history) there is no entry
Expand Down Expand Up @@ -169,7 +168,7 @@ public void TestWorkFlow()
AssertHistory(longHistory, kv.History(longKey));
// let's check the bucket info
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 7, 7);
// make sure it only keeps the correct amount of history
Expand All @@ -181,7 +180,7 @@ public void TestWorkFlow()
AssertEntry(BUCKET, longKey, KeyValueOperation.Put, 8, "3", utcNow, kv.Get(longKey)));
AssertHistory(longHistory, kv.History(longKey));
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 8, 8);
// this would be the 4th entry for the longKey
Expand All @@ -197,7 +196,7 @@ public void TestWorkFlow()
AssertHistory(longHistory, kv.History(longKey));
// record count does not increase
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 8, 9);
// should have exactly these 3 keys
Expand All @@ -210,7 +209,7 @@ public void TestWorkFlow()
longHistory.Add(KeyValueOperation.Purge);
AssertHistory(longHistory, kv.History(longKey));
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 6, 10);
// only 2 keys now
Expand All @@ -222,7 +221,7 @@ public void TestWorkFlow()
byteHistory.Add(KeyValueOperation.Purge);
AssertHistory(byteHistory, kv.History(byteKey));
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 4, 11);
// only 1 key now
Expand All @@ -234,7 +233,7 @@ public void TestWorkFlow()
stringHistory.Add(KeyValueOperation.Purge);
AssertHistory(stringHistory, kv.History(stringKey));
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 3, 12);
// no more keys left
Expand All @@ -243,7 +242,7 @@ public void TestWorkFlow()
// clear things
KeyValuePurgeOptions kvpo = KeyValuePurgeOptions.Builder().WithDeleteMarkersNoThreshold().Build();
kv.PurgeDeletes(kvpo);
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 0, 12);
longHistory.Clear();
Expand Down Expand Up @@ -276,13 +275,13 @@ public void TestWorkFlow()
AssertHistory(longHistory, kv.History(longKey));
AssertHistory(stringHistory, kv.History(stringKey));
status = kvm.GetBucketInfo(BUCKET);
status = kvm.GetStatus(BUCKET);
AssertState(status, 5, 17);
// delete the bucket
kvm.Delete(BUCKET);
Assert.Throws<NATSJetStreamException>(() => kvm.Delete(BUCKET));
Assert.Throws<NATSJetStreamException>(() => kvm.GetBucketInfo(BUCKET));
Assert.Throws<NATSJetStreamException>(() => kvm.GetStatus(BUCKET));
Assert.Equal(0, kvm.GetBucketNames().Count);
});
Expand Down Expand Up @@ -460,7 +459,7 @@ public void TestWorkFlow()
// get the kv management context
IKeyValueManagement kvm = c.CreateKeyValueManagementContext();
Assert.Throws<NATSJetStreamException>(() => kvm.GetBucketInfo(BUCKET));
Assert.Throws<NATSJetStreamException>(() => kvm.GetStatus(BUCKET));
KeyValueStatus kvs = kvm.Create(KeyValueConfiguration.Builder()
.WithName(BUCKET)
Expand Down Expand Up @@ -628,7 +627,7 @@ public void TestWorkFlow()
}

[Fact]
public void TestManageGetBucketNames() {
public void TestManageGetBucketNamesStatuses() {
Context.RunInJsServer(c =>
{
// get the kv management context
Expand All @@ -648,11 +647,21 @@ public void TestWorkFlow()
CreateMemoryStream(c, Stream(1));
CreateMemoryStream(c, Stream(2));
IList<KeyValueStatus> infos = kvm.GetStatuses();
Assert.Equal(2, infos.Count);
IList<string> buckets = new List<string>();
foreach (KeyValueStatus status in infos) {
buckets.Add(status.BucketName);
}
Assert.Equal(2, buckets.Count);
Assert.True(buckets.Contains(Bucket(1)));
Assert.True(buckets.Contains(Bucket(2)));
IList<string> buckets = kvm.GetBucketNames();
buckets = kvm.GetBucketNames();
Assert.Equal(2, buckets.Count);
Assert.Contains(Bucket(1), buckets);
Assert.Contains(Bucket(2), buckets);
Assert.True(buckets.Contains(Bucket(1)));
Assert.True(buckets.Contains(Bucket(2)));
});
}

Expand Down Expand Up @@ -960,10 +969,10 @@ public void TestWithAccount()
AssertKvAccountBucketNames(kvmUserA.GetBucketNames());
AssertKvAccountBucketNames(kvmUserIBcktI.GetBucketNames());

Assert.Equal(BucketCreatedByUserA, kvmUserA.GetBucketInfo(BucketCreatedByUserA).BucketName);
Assert.Equal(BucketCreatedByUserA, kvmUserIBcktA.GetBucketInfo(BucketCreatedByUserA).BucketName);
Assert.Equal(BucketCreatedByUserI, kvmUserA.GetBucketInfo(BucketCreatedByUserI).BucketName);
Assert.Equal(BucketCreatedByUserI, kvmUserIBcktI.GetBucketInfo(BucketCreatedByUserI).BucketName);
Assert.Equal(BucketCreatedByUserA, kvmUserA.GetStatus(BucketCreatedByUserA).BucketName);
Assert.Equal(BucketCreatedByUserA, kvmUserIBcktA.GetStatus(BucketCreatedByUserA).BucketName);
Assert.Equal(BucketCreatedByUserI, kvmUserA.GetStatus(BucketCreatedByUserI).BucketName);
Assert.Equal(BucketCreatedByUserI, kvmUserIBcktI.GetStatus(BucketCreatedByUserI).BucketName);

// some more prep
IKeyValue kv_connA_bucketA = connUserA.CreateKeyValueContext(BucketCreatedByUserA, jsOpt_UserA_NoPrefix);
Expand Down
14 changes: 12 additions & 2 deletions src/Tests/IntegrationTests/TestObjectStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private static object[] GetInput(int size, string path, long foundLen, FileInfo
}

[Fact]
public void TestManageGetBucketNames() {
public void TestManageGetBucketNamesStatuses() {
Context.RunInJsServer(nc =>
{
IObjectStoreManagement osm = nc.CreateObjectStoreManagementContext();
Expand All @@ -283,7 +283,17 @@ private static object[] GetInput(int size, string path, long foundLen, FileInfo
CreateMemoryStream(nc, Stream(1));
CreateMemoryStream(nc, Stream(2));
IList<string> buckets = osm.GetBucketNames();
IList<ObjectStoreStatus> infos = osm.GetStatuses();
Assert.Equal(2, infos.Count);
IList<string> buckets = new List<string>();
foreach (ObjectStoreStatus status in infos) {
buckets.Add(status.BucketName);
}
Assert.Equal(2, buckets.Count);
Assert.True(buckets.Contains(Bucket(1)));
Assert.True(buckets.Contains(Bucket(2)));
buckets = osm.GetBucketNames();
Assert.Equal(2, buckets.Count);
Assert.True(buckets.Contains(Bucket(1)));
Assert.True(buckets.Contains(Bucket(2)));
Expand Down

0 comments on commit a5d45ca

Please sign in to comment.