Skip to content

Commit

Permalink
Merge 01f561f into c3a557a
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Aug 31, 2022
2 parents c3a557a + 01f561f commit d93d6c2
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
58 changes: 49 additions & 9 deletions object.go
Expand Up @@ -75,7 +75,7 @@ type ObjectStore interface {

// GetInfo will retrieve the current information for the object.
GetInfo(name string) (*ObjectInfo, error)
// UpdateMeta will update the meta data for the object.
// UpdateMeta will update the metadata for the object.
UpdateMeta(name string, meta *ObjectMeta) error

// Delete will delete the named object.
Expand Down Expand Up @@ -133,6 +133,7 @@ var (
ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name")
ErrNameRequired = errors.New("nats: name is required")
ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2")
ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket")
)

// ObjectStoreConfig is the config for the object store.
Expand Down Expand Up @@ -311,6 +312,14 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return nil, ErrBadObjectMeta
}

if meta.Opts == nil {
meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize}
} else if meta.Opts.Link != nil {
return nil, ErrLinkNotAllowed
} else if meta.Opts.ChunkSize == 0 {
meta.Opts.ChunkSize = objDefaultChunkSize
}

var o objOpts
for _, opt := range opts {
if opt != nil {
Expand Down Expand Up @@ -356,13 +365,8 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return nil, err
}

chunkSize := objDefaultChunkSize
if meta.Opts != nil && meta.Opts.ChunkSize > 0 {
chunkSize = meta.Opts.ChunkSize
}

m, h := NewMsg(chunkSubj), sha256.New()
chunk, sent, total := make([]byte, chunkSize), 0, uint64(0)
chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0)

// set up the info object. The chunk upload sets the size and digest
info := &ObjectInfo{Bucket: obs.name, NUID: newnuid, ObjectMeta: *meta}
Expand Down Expand Up @@ -630,6 +634,30 @@ func (obs *obs) Delete(name string) error {
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

func publishMeta(info *ObjectInfo, js JetStreamContext) error {
// Prepare the meta message
metaSubj := fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name))
mm := NewMsg(metaSubj)
mm.Header.Set(MsgRollup, MsgRollupSubject)

info.ModTime = time.Time{} // We don't store this, just do it after publish
data, err := json.Marshal(info)
if err != nil {
return err
}

// Publish the meta message.
mm.Data = data
_, err = js.PublishMsg(mm)
if err != nil {
return err
}

// set the ModTime in case it's returned to the user, even though it's not the correct time.
info.ModTime = time.Now().UTC()
return nil
}

// AddLink will add a link to another object if it's not deleted and not another link
// name is the name of this link object
// obj is what is being linked too
Expand Down Expand Up @@ -664,9 +692,15 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}},
}
info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta}

// put the link object
return obs.Put(meta, nil)
err = publishMeta(info, obs.js)
if err != nil {
return nil, err
}

return info, nil
}

// AddBucketLink will add a link to another object store.
Expand Down Expand Up @@ -699,9 +733,15 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}},
}
info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta}

// put the link object
return ob.Put(meta, nil)
err = publishMeta(info, ob.js)
if err != nil {
return nil, err
}

return info, nil
}

// PutBytes is convenience function to put a byte slice into this object store.
Expand Down
5 changes: 5 additions & 0 deletions test/object_test.go
Expand Up @@ -431,6 +431,11 @@ func TestObjectMetadata(t *testing.T) {
if err == nil {
t.Fatal("Expected an error when trying to update an object that does not exist.")
}

// can't have a link when putting an object
meta.Opts = &nats.ObjectMetaOptions{Link: &nats.ObjectLink{Bucket: "DoesntMatter"}}
_, err = obs.Put(meta, nil)
expectErr(t, err, nats.ErrLinkNotAllowed)
}

func TestObjectWatch(t *testing.T) {
Expand Down

0 comments on commit d93d6c2

Please sign in to comment.