Skip to content

Commit

Permalink
add link / add bucket link don't need to call Put since there is no d…
Browse files Browse the repository at this point in the history
…ata, they can just publish the object info (meta)
  • Loading branch information
scottf committed Aug 31, 2022
1 parent c9136ca commit 5309e5d
Showing 1 changed file with 39 additions and 3 deletions.
42 changes: 39 additions & 3 deletions object.go
Expand Up @@ -133,7 +133,7 @@ var (
ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name")
ErrNameRequired = errors.New("nats: name is required")
ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2")
ErrLinkNotAllowed = errors.New("nats: a link is not allowed with an object")
ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket")
)

// ObjectStoreConfig is the config for the object store.
Expand Down Expand Up @@ -634,6 +634,30 @@ func (obs *obs) Delete(name string) error {
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

func publishMeta(info *ObjectInfo, js JetStreamContext) error {
// Prepare the meta message
metaSubj := fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name))
mm := NewMsg(metaSubj)
mm.Header.Set(MsgRollup, MsgRollupSubject)

info.ModTime = time.Time{} // We don't store this, just do it after publish
data, err := json.Marshal(info)
if err != nil {
return err
}

// Publish the meta message.
mm.Data = data
_, err = js.PublishMsg(mm)
if err != nil {
return err
}

// set the ModTime in case it's returned to the user, even though it's not the correct time.
info.ModTime = time.Now().UTC()
return nil
}

// AddLink will add a link to another object if it's not deleted and not another link
// name is the name of this link object
// obj is what is being linked too
Expand Down Expand Up @@ -668,9 +692,15 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}},
}
info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta}

// put the link object
return obs.Put(meta, nil)
err = publishMeta(info, obs.js)
if err != nil {
return nil, err
}

return info, nil
}

// AddBucketLink will add a link to another object store.
Expand Down Expand Up @@ -703,9 +733,15 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}},
}
info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta}

// put the link object
return ob.Put(meta, nil)
err = publishMeta(info, ob.js)
if err != nil {
return nil, err
}

return info, nil
}

// PutBytes is convenience function to put a byte slice into this object store.
Expand Down

0 comments on commit 5309e5d

Please sign in to comment.