From 08dd7d3cb96cd989b577c45b44f3be85d844290a Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 20 Sep 2022 15:45:45 +0200 Subject: [PATCH] Do not show deleted objects by default --- object.go | 201 +++++++++++++++++++++++++++++++++++--------- test/object_test.go | 111 ++++++++++++++++++------ 2 files changed, 250 insertions(+), 62 deletions(-) diff --git a/object.go b/object.go index 6a9d90192..0cbb0bcd0 100644 --- a/object.go +++ b/object.go @@ -60,25 +60,25 @@ type ObjectStore interface { // Put will place the contents from the reader into a new object. Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) // Get will pull the named object from the object store. - Get(name string, opts ...ObjectOpt) (ObjectResult, error) + Get(name string, opts ...GetObjectOpt) (ObjectResult, error) // PutBytes is convenience function to put a byte slice into this object store. PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. - GetBytes(name string, opts ...ObjectOpt) ([]byte, error) + GetBytes(name string, opts ...GetObjectOpt) ([]byte, error) // PutString is convenience function to put a string into this object store. PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) // GetString is a convenience function to pull an object from this object store and return it as a string. - GetString(name string, opts ...ObjectOpt) (string, error) + GetString(name string, opts ...GetObjectOpt) (string, error) // PutFile is convenience function to put a file into this object store. PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) // GetFile is a convenience function to pull an object from this object store and place it in a file. - GetFile(name, file string, opts ...ObjectOpt) error + GetFile(name, file string, opts ...GetObjectOpt) error // GetInfo will retrieve the current information for the object. - GetInfo(name string) (*ObjectInfo, error) + GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) // UpdateMeta will update the metadata for the object. UpdateMeta(name string, meta *ObjectMeta) error @@ -98,7 +98,7 @@ type ObjectStore interface { Watch(opts ...WatchOpt) (ObjectWatcher, error) // List will list all the objects in this store. - List(opts ...WatchOpt) ([]*ObjectInfo, error) + List(opts ...ListObjectsOpt) ([]*ObjectInfo, error) // Status retrieves run-time status about the backing store of the bucket. Status() (ObjectStoreStatus, error) @@ -349,7 +349,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn // Grab existing meta info (einfo). Ok to be found or not found, any other error is a problem // Chunks on the old nuid can be cleaned up at the end - einfo, err := obs.GetInfo(meta.Name) // GetInfo will encode the name + einfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted()) // GetInfo will encode the name if err != nil && err != ErrObjectNotFound { return nil, err } @@ -517,10 +517,56 @@ func (info *ObjectInfo) isLink() bool { return info.ObjectMeta.Opts != nil && info.ObjectMeta.Opts.Link != nil } +type GetObjectOpt interface { + configureGetObject(opts *getObjectOpts) error +} +type getObjectOpts struct { + ctx context.Context + // Include deleted object in the result. + showDeleted bool +} + +type getObjectFn func(opts *getObjectOpts) error + +func (opt getObjectFn) configureGetObject(opts *getObjectOpts) error { + return opt(opts) +} + +// GetObjectShowDeleted makes Get() return object if it was marked as deleted. +func GetObjectShowDeleted() GetObjectOpt { + return getObjectFn(func(opts *getObjectOpts) error { + opts.showDeleted = true + return nil + }) +} + +// For nats.Context() support. +func (ctx ContextOpt) configureGetObject(opts *getObjectOpts) error { + opts.ctx = ctx + return nil +} + // Get will pull the object from the underlying stream. -func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) { +func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) { + var o getObjectOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureGetObject(&o); err != nil { + return nil, err + } + } + } + ctx := o.ctx + infoOpts := make([]GetObjectInfoOpt, 0) + if ctx != nil { + infoOpts = append(infoOpts, Context(ctx)) + } + if o.showDeleted { + infoOpts = append(infoOpts, GetObjectInfoShowDeleted()) + } + // Grab meta info. - info, err := obs.GetInfo(name) + info, err := obs.GetInfo(name, infoOpts...) if err != nil { return nil, err } @@ -548,16 +594,6 @@ func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) { return lobs.Get(info.ObjectMeta.Opts.Link.Name) } - var o objOpts - for _, opt := range opts { - if opt != nil { - if err := opt.configureObject(&o); err != nil { - return nil, err - } - } - } - ctx := o.ctx - result := &objResult{info: info, ctx: ctx} if info.Size == 0 { return result, nil @@ -629,7 +665,7 @@ func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) { // Delete will delete the object. func (obs *obs) Delete(name string) error { // Grab meta info. - info, err := obs.GetInfo(name) + info, err := obs.GetInfo(name, GetObjectInfoShowDeleted()) if err != nil { return err } @@ -694,7 +730,7 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { // If object with link's name is found, error. // If link with link's name is found, that's okay to overwrite. // If there was an error that was not ErrObjectNotFound, error. - einfo, err := obs.GetInfo(name) + einfo, err := obs.GetInfo(name, GetObjectInfoShowDeleted()) if einfo != nil { if !einfo.isLink() { return nil, ErrObjectAlreadyExists @@ -734,7 +770,7 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro // If object with link's name is found, error. // If link with link's name is found, that's okay to overwrite. // If there was an error that was not ErrObjectNotFound, error. - einfo, err := ob.GetInfo(name) + einfo, err := ob.GetInfo(name, GetObjectInfoShowDeleted()) if einfo != nil { if !einfo.isLink() { return nil, ErrObjectAlreadyExists @@ -765,7 +801,7 @@ func (obs *obs) PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectIn } // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. -func (obs *obs) GetBytes(name string, opts ...ObjectOpt) ([]byte, error) { +func (obs *obs) GetBytes(name string, opts ...GetObjectOpt) ([]byte, error) { result, err := obs.Get(name, opts...) if err != nil { return nil, err @@ -785,7 +821,7 @@ func (obs *obs) PutString(name string, data string, opts ...ObjectOpt) (*ObjectI } // GetString is a convenience function to pull an object from this object store and return it as a string. -func (obs *obs) GetString(name string, opts ...ObjectOpt) (string, error) { +func (obs *obs) GetString(name string, opts ...GetObjectOpt) (string, error) { result, err := obs.Get(name, opts...) if err != nil { return _EMPTY_, err @@ -810,7 +846,7 @@ func (obs *obs) PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) { } // GetFile is a convenience function to pull and object and place in a file. -func (obs *obs) GetFile(name, file string, opts ...ObjectOpt) error { +func (obs *obs) GetFile(name, file string, opts ...GetObjectOpt) error { // Expect file to be new. f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { @@ -830,12 +866,49 @@ func (obs *obs) GetFile(name, file string, opts ...ObjectOpt) error { return err } +type GetObjectInfoOpt interface { + configureGetInfo(opts *getObjectInfoOpts) error +} +type getObjectInfoOpts struct { + ctx context.Context + // Include deleted object in the result. + showDeleted bool +} + +type getObjectInfoFn func(opts *getObjectInfoOpts) error + +func (opt getObjectInfoFn) configureGetInfo(opts *getObjectInfoOpts) error { + return opt(opts) +} + +// GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted. +func GetObjectInfoShowDeleted() GetObjectInfoOpt { + return getObjectInfoFn(func(opts *getObjectInfoOpts) error { + opts.showDeleted = true + return nil + }) +} + +// For nats.Context() support. +func (ctx ContextOpt) configureGetInfo(opts *getObjectInfoOpts) error { + opts.ctx = ctx + return nil +} + // GetInfo will retrieve the current information for the object. -func (obs *obs) GetInfo(name string) (*ObjectInfo, error) { +func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) { // Grab last meta value we have. if name == "" { return nil, ErrNameRequired } + var o getObjectInfoOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureGetInfo(&o); err != nil { + return nil, err + } + } + } metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name)) // used as data in a JS API call stream := fmt.Sprintf(objNameTmpl, obs.name) @@ -851,6 +924,9 @@ func (obs *obs) GetInfo(name string) (*ObjectInfo, error) { if err := json.Unmarshal(m.Data, &info); err != nil { return nil, ErrBadObjectMeta } + if !o.showDeleted && info.Deleted { + return nil, ErrObjectNotFound + } info.ModTime = m.Time return &info, nil } @@ -864,17 +940,16 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { // Grab the current meta. info, err := obs.GetInfo(name) if err != nil { + if errors.Is(err, ErrObjectNotFound) { + return ErrUpdateMetaDeleted + } return err } - if info.Deleted { - return ErrUpdateMetaDeleted - } - // If the new name is different from the old, and it exists, error // If there was an error that was not ErrObjectNotFound, error. if name != meta.Name { - existingInfo, err := obs.GetInfo(meta.Name) + existingInfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted()) if err != nil && !errors.Is(err, ErrObjectNotFound) { return err } @@ -996,21 +1071,71 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) { return w, nil } +type ListObjectsOpt interface { + configureListObjects(opts *listObjectOpts) error +} +type listObjectOpts struct { + ctx context.Context + // Include deleted objects in the result channel. + showDeleted bool +} + +type listObjectsFn func(opts *listObjectOpts) error + +func (opt listObjectsFn) configureListObjects(opts *listObjectOpts) error { + return opt(opts) +} + +// ListObjectsShowDeleted makes ListObjects() return deleted objects. +func ListObjectsShowDeleted() ListObjectsOpt { + return listObjectsFn(func(opts *listObjectOpts) error { + opts.showDeleted = true + return nil + }) +} + +// For nats.Context() support. +func (ctx ContextOpt) configureListObjects(opts *listObjectOpts) error { + opts.ctx = ctx + return nil +} + // List will list all the objects in this store. -func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) { - opts = append(opts, IgnoreDeletes()) - watcher, err := obs.Watch(opts...) +func (obs *obs) List(opts ...ListObjectsOpt) ([]*ObjectInfo, error) { + var o listObjectOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureListObjects(&o); err != nil { + return nil, err + } + } + } + watchOpts := make([]WatchOpt, 0) + if !o.showDeleted { + watchOpts = append(watchOpts, IgnoreDeletes()) + } + watcher, err := obs.Watch(watchOpts...) if err != nil { return nil, err } defer watcher.Stop() + if o.ctx == nil { + o.ctx = context.Background() + } var objs []*ObjectInfo - for entry := range watcher.Updates() { - if entry == nil { - break + updates := watcher.Updates() +Updates: + for { + select { + case entry := <-updates: + if entry == nil { + break Updates + } + objs = append(objs, entry) + case <-o.ctx.Done(): + return nil, o.ctx.Err() } - objs = append(objs, entry) } if len(objs) == 0 { return nil, ErrNoObjectsFound diff --git a/test/object_test.go b/test/object_test.go index bbede2018..9e1324737 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -15,6 +15,7 @@ package test import ( "bytes" + "context" "crypto/rand" "crypto/sha256" "fmt" @@ -302,12 +303,20 @@ func TestObjectDeleteMarkers(t *testing.T) { if si.State.Msgs != 1 { t.Fatalf("Expected 1 marker msg, got %d msgs", si.State.Msgs) } - // Make sure we have a delete marker, this will be there to drive Watch functionality. - info, err := obs.GetInfo("A") + // For deleted object return error + _, err = obs.GetInfo("A") + expectErr(t, err, nats.ErrObjectNotFound) + _, err = obs.Get("A") + expectErr(t, err, nats.ErrObjectNotFound) + + info, err := obs.GetInfo("A", nats.GetObjectInfoShowDeleted()) expectOk(t, err) + // Make sure we have a delete marker, this will be there to drive Watch functionality. if !info.Deleted { t.Fatalf("Expected info to be marked as deleted") } + _, err = obs.Get("A", nats.GetObjectShowDeleted()) + expectOk(t, err) } func TestObjectMultiWithDelete(t *testing.T) { @@ -530,7 +539,7 @@ func TestObjectWatch(t *testing.T) { expectOk(t, err) // Update Meta - deletedInfo, err := obs.GetInfo("A") + deletedInfo, err := obs.GetInfo("A", nats.GetObjectInfoShowDeleted()) expectOk(t, err) if !deletedInfo.Deleted { t.Fatalf("Expected object to be deleted.") @@ -661,7 +670,7 @@ func TestObjectLinks(t *testing.T) { _, err = root.AddLink("ToDeletedStale", infoA) expectOk(t, err) // TODO deal with this in the code somehow - infoA, err = root.GetInfo("A") + infoA, err = root.GetInfo("A", nats.GetObjectInfoShowDeleted()) expectOk(t, err) _, err = root.AddLink("ToDeletedFresh", infoA) @@ -748,28 +757,82 @@ func TestObjectList(t *testing.T) { err = root.Delete("D") expectOk(t, err) - lch, err := root.List() - expectOk(t, err) + t.Run("without deleted objects", func(t *testing.T) { + lch, err := root.List() + expectOk(t, err) - omap := make(map[string]struct{}) - for _, info := range lch { - if _, ok := omap[info.Name]; ok { - t.Fatalf("Already saw %q", info.Name) + omap := make(map[string]struct{}) + for _, info := range lch { + if _, ok := omap[info.Name]; ok { + t.Fatalf("Already saw %q", info.Name) + } + omap[info.Name] = struct{}{} } - omap[info.Name] = struct{}{} - } - if len(omap) != 4 { - t.Fatalf("Expected 4 total objects, got %d", len(omap)) - } - expected := map[string]struct{}{ - "A": struct{}{}, - "B": struct{}{}, - "C": struct{}{}, - "b": struct{}{}, - } - if !reflect.DeepEqual(omap, expected) { - t.Fatalf("Expected %+v but got %+v", expected, omap) - } + if len(omap) != 4 { + t.Fatalf("Expected 4 total objects, got %d", len(omap)) + } + expected := map[string]struct{}{ + "A": struct{}{}, + "B": struct{}{}, + "C": struct{}{}, + "b": struct{}{}, + } + if !reflect.DeepEqual(omap, expected) { + t.Fatalf("Expected %+v but got %+v", expected, omap) + } + }) + + t.Run("with deleted objects", func(t *testing.T) { + lch, err := root.List(nats.ListObjectsShowDeleted()) + expectOk(t, err) + + res := make([]string, 0) + for _, info := range lch { + res = append(res, info.Name) + } + if len(res) != 5 { + t.Fatalf("Expected 5 total objects, got %d", len(res)) + } + expected := []string{"A", "C", "B", "b", "D"} + + if !reflect.DeepEqual(res, expected) { + t.Fatalf("Expected %+v but got %+v", expected, res) + } + }) + + t.Run("with context", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + lch, err := root.List(nats.Context(ctx)) + expectOk(t, err) + + omap := make(map[string]struct{}) + for _, info := range lch { + if _, ok := omap[info.Name]; ok { + t.Fatalf("Already saw %q", info.Name) + } + omap[info.Name] = struct{}{} + } + if len(omap) != 4 { + t.Fatalf("Expected 4 total objects, got %d", len(omap)) + } + expected := map[string]struct{}{ + "A": struct{}{}, + "B": struct{}{}, + "C": struct{}{}, + "b": struct{}{}, + } + if !reflect.DeepEqual(omap, expected) { + t.Fatalf("Expected %+v but got %+v", expected, omap) + } + }) + + t.Run("context timeout", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) + defer cancel() + _, err := root.List(nats.Context(ctx)) + expectErr(t, err, context.DeadlineExceeded) + }) } func TestObjectMaxBytes(t *testing.T) {