Skip to content

Commit

Permalink
Merge pull request #846 from ripienaar/obj_status
Browse files Browse the repository at this point in the history
reimplement bucket status to avoid unspecified maps
  • Loading branch information
ripienaar committed Oct 13, 2021
2 parents 15a7702 + a512063 commit 0f47b1c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 74 deletions.
41 changes: 14 additions & 27 deletions kv.go
Expand Up @@ -84,8 +84,8 @@ type KeyValueStatus interface {
// TTL is how long the bucket keeps values for
TTL() time.Duration

// BackingStore is information about the backend hosting the data
BackingStore() BackingStore
// BackingStore indicates what technology is used for storage of the bucket
BackingStore() string
}

// KeyWatcher is what is returned when doing a watch.
Expand Down Expand Up @@ -678,33 +678,29 @@ func (kv *kvs) Bucket() string {
return kv.name
}

type kvBackingStore struct {
info map[string]string
}

func (b *kvBackingStore) Kind() string { return "JetStream" }
func (b *kvBackingStore) Info() map[string]string { return b.info }

type kvStatus struct {
// KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus
type KeyValueBucketStatus struct {
nfo *StreamInfo
bucket string
bs *kvBackingStore
}

// Bucket the name of the bucket
func (s *kvStatus) Bucket() string { return s.bucket }
func (s *KeyValueBucketStatus) Bucket() string { return s.bucket }

// Values is how many messages are in the bucket, including historical values
func (s *kvStatus) Values() uint64 { return s.nfo.State.Msgs }
func (s *KeyValueBucketStatus) Values() uint64 { return s.nfo.State.Msgs }

// History returns the configured history kept per key
func (s *kvStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }
func (s *KeyValueBucketStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject }

// TTL is how long the bucket keeps values for
func (s *kvStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
func (s *KeyValueBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }

// BackingStore indicates what technology is used for storage of the bucket
func (s *KeyValueBucketStatus) BackingStore() string { return "JetStream" }

// BackingStore is information about the backend and storage used for the KV store
func (s *kvStatus) BackingStore() BackingStore { return s.bs }
// StreamInfo is the stream info retrieved to create the status
func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo }

// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
Expand All @@ -713,14 +709,5 @@ func (kv *kvs) Status() (KeyValueStatus, error) {
return nil, err
}

bs := &kvBackingStore{info: map[string]string{
"stream": kv.stream,
"domain": kv.js.opts.domain,
}}

if nfo.Cluster != nil {
bs.info["placement_cluster"] = nfo.Cluster.Name
}

return &kvStatus{nfo: nfo, bucket: kv.name, bs: bs}, nil
return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil
}
67 changes: 30 additions & 37 deletions object.go
Expand Up @@ -140,12 +140,6 @@ type ObjectStoreConfig struct {
Replicas int
}

// BackingStore describes the implementation and storage backend of KV or Object stores
type BackingStore interface {
Kind() string
Info() map[string]string
}

type ObjectStoreStatus interface {
// Bucket is the name of the bucket
Bucket() string
Expand All @@ -162,7 +156,7 @@ type ObjectStoreStatus interface {
// Size is the combined size of all data in the bucket including metadata, in bytes
Size() uint64
// BackingStore provides details about the underlying storage
BackingStore() BackingStore
BackingStore() string
}

// ObjectMetaOptions
Expand Down Expand Up @@ -890,27 +884,38 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) {
return objs, nil
}

type objBackingStore struct {
info map[string]string
}

func (b *objBackingStore) Kind() string { return "JetStream" }
func (b *objBackingStore) Info() map[string]string { return b.info }

type objStatus struct {
// ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus
type ObjectBucketStatus struct {
nfo *StreamInfo
bucket string
bs BackingStore
}

func (s *objStatus) Bucket() string { return s.bucket }
func (s *objStatus) Description() string { return s.nfo.Config.Description }
func (s *objStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }
func (s *objStatus) Storage() StorageType { return s.nfo.Config.Storage }
func (s *objStatus) Replicas() int { return s.nfo.Config.Replicas }
func (s *objStatus) Sealed() bool { return s.nfo.Config.Sealed }
func (s *objStatus) Size() uint64 { return s.nfo.State.Bytes }
func (s *objStatus) BackingStore() BackingStore { return s.bs }
// Bucket is the name of the bucket
func (s *ObjectBucketStatus) Bucket() string { return s.bucket }

// Description is the description supplied when creating the bucket
func (s *ObjectBucketStatus) Description() string { return s.nfo.Config.Description }

// TTL indicates how long objects are kept in the bucket
func (s *ObjectBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge }

// Storage indicates the underlying JetStream storage technology used to store data
func (s *ObjectBucketStatus) Storage() StorageType { return s.nfo.Config.Storage }

// Replicas indicates how many storage replicas are kept for the data in the bucket
func (s *ObjectBucketStatus) Replicas() int { return s.nfo.Config.Replicas }

// Sealed indicates the stream is sealed and cannot be modified in any way
func (s *ObjectBucketStatus) Sealed() bool { return s.nfo.Config.Sealed }

// Size is the combined size of all data in the bucket including metadata, in bytes
func (s *ObjectBucketStatus) Size() uint64 { return s.nfo.State.Bytes }

// BackingStore indicates what technology is used for storage of the bucket
func (s *ObjectBucketStatus) BackingStore() string { return "JetStream" }

// StreamInfo is the stream info retrieved to create the status
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo }

// Status retrieves run-time status about a bucket
func (obs *obs) Status() (ObjectStoreStatus, error) {
Expand All @@ -919,21 +924,9 @@ func (obs *obs) Status() (ObjectStoreStatus, error) {
return nil, err
}

bs := &objBackingStore{
info: map[string]string{
"stream": obs.stream,
"domain": obs.js.opts.domain,
},
}

if nfo.Cluster != nil {
bs.info["placement_cluster"] = nfo.Cluster.Name
}

status := &objStatus{
status := &ObjectBucketStatus{
nfo: nfo,
bucket: obs.name,
bs: bs,
}

return status, nil
Expand Down
13 changes: 7 additions & 6 deletions test/kv_test.go
Expand Up @@ -94,13 +94,14 @@ func TestKeyValueBasics(t *testing.T) {
if status.Values() != 7 {
t.Fatalf("expected 7 values got %d", status.Values())
}
bs := status.BackingStore()
if bs.Kind() != "JetStream" {
t.Fatalf("invalid backing store kind %s", bs.Kind())
if status.BackingStore() != "JetStream" {
t.Fatalf("invalid backing store kind %s", status.BackingStore())
}
info := bs.Info()
if info["stream"] != "KV_TEST" {
t.Fatalf("invalid stream name %+v", info)

kvs := status.(*nats.KeyValueBucketStatus)
si := kvs.StreamInfo()
if si == nil {
t.Fatalf("StreamInfo not received")
}
}

Expand Down
9 changes: 5 additions & 4 deletions test/object_test.go
Expand Up @@ -130,11 +130,12 @@ func TestDefaultObjectStatus(t *testing.T) {

status, err := obs.Status()
expectOk(t, err)
if status.BackingStore().Kind() != "JetStream" {
t.Fatalf("invalid backing store kind: %s", status.BackingStore().Kind())
if status.BackingStore() != "JetStream" {
t.Fatalf("invalid backing store kind: %s", status.BackingStore())
}
info := status.BackingStore().Info()
if info["stream"] != "OBJ_OBJS" {
bs := status.(*nats.ObjectBucketStatus)
info := bs.StreamInfo()
if info.Config.Name != "OBJ_OBJS" {
t.Fatalf("invalid stream name %+v", info)
}
}
Expand Down

0 comments on commit 0f47b1c

Please sign in to comment.