Skip to content

Commit

Permalink
[FIXED] Object store context put and get issues (#1260)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed May 4, 2023
1 parent d963c7d commit c3cae07
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
12 changes: 9 additions & 3 deletions object.go
Expand Up @@ -368,13 +368,19 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return perr
}

purgePartial := func() { obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) }

// Create our own JS context to handle errors etc.
js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) }))
if err != nil {
return nil, err
}
purgePartial := func() {
// wait until all pubs are complete or up to default timeout before attempting purge
select {
case <-js.PublishAsyncComplete():
case <-time.After(obs.js.opts.wait):
}
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

m, h := NewMsg(chunkSubj), sha256.New()
chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0)
Expand Down Expand Up @@ -1207,7 +1213,7 @@ func (o *objResult) Read(p []byte) (n int, err error) {
}
}
if o.err != nil {
return 0, err
return 0, o.err
}
if o.r == nil {
return 0, io.EOF
Expand Down
3 changes: 1 addition & 2 deletions test/norace_test.go
Expand Up @@ -59,9 +59,8 @@ func TestNoRaceObjectContextOpt(t *testing.T) {
expectOk(t, err)

ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
time.AfterFunc(100*time.Millisecond, cancel)
time.AfterFunc(10*time.Millisecond, cancel)

time.AfterFunc(20*time.Millisecond, func() { shutdownJSServerAndRemoveStorage(t, s) })
start = time.Now()
_, err = obs.GetBytes("BLOB", nats.Context(ctx))
expectErr(t, err)
Expand Down

0 comments on commit c3cae07

Please sign in to comment.