From b448beb0763e4a40628b9b4a6a862b3a331709b1 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 10 Sep 2025 22:05:31 -1000 Subject: [PATCH] improve close errors and add tests --- dsqueue.go | 27 ++++++++++++++++------ dsqueue_test.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 8 deletions(-) diff --git a/dsqueue.go b/dsqueue.go index c453092..e031e2a 100644 --- a/dsqueue.go +++ b/dsqueue.go @@ -36,7 +36,7 @@ var log = logging.Logger("dsqueue") // then queued items can remain in memory. When the queue is closed, any // remaining items in memory are written to the datastore. type DSQueue struct { - close context.CancelFunc + cancel context.CancelFunc closed chan error closeOnce sync.Once dequeue chan []byte @@ -54,7 +54,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue { ctx, cancel := context.WithCancel(context.Background()) q := &DSQueue{ - close: cancel, + cancel: cancel, closed: make(chan error, 1), dequeue: make(chan []byte), ds: namespace.Wrap(ds, datastore.NewKey("/dsq-"+name)), @@ -84,7 +84,7 @@ func (q *DSQueue) Close() error { select { case <-q.closed: case <-timeoutCh: - q.close() // force immediate shutdown + q.cancel() // force immediate shutdown err = <-q.closed } close(q.dequeue) // no more output from this queue @@ -150,21 +150,30 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id defer func() { if item != "" { + // Write the item directly, instead of pushing it to the front of + // inbuf, in order to retain it's original kay, and therefore the + // order in the datastore, which may not be empty. if err := q.ds.Put(ctx, k, nil); err != nil { - log.Errorw("failed to write item to datastore", "err", err, "qname", q.name) + if !errors.Is(err, context.Canceled) { + log.Errorw("failed to write item to datastore", "err", err, "qname", q.name) + } + q.closed <- fmt.Errorf("%d items not written to datastore", 1+inBuf.Len()) + return } } if inBuf.Len() != 0 { err := q.commitInput(ctx, counter, &inBuf) - if err != nil && !errors.Is(err, context.Canceled) { - log.Errorw("error writing items to datastore", "err", err, "qname", q.name) + if err != nil { + if !errors.Is(err, context.Canceled) { + log.Errorw("error writing items to datastore", "err", err, "qname", q.name) + } if inBuf.Len() != 0 { q.closed <- fmt.Errorf("%d items not written to datastore", inBuf.Len()) } } } if err := q.ds.Sync(ctx, datastore.NewKey("")); err != nil { - q.closed <- fmt.Errorf("cannot sync datastore: %w", err) + log.Errorw("failed to sync datastore", "err", err, "qname", q.name) } }() @@ -378,6 +387,10 @@ func (q *DSQueue) getQueueHead(ctx context.Context) (*query.Entry, error) { } func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.Deque[string]) error { + if ctx.Err() != nil { + return ctx.Err() + } + b, err := q.ds.Batch(ctx) if err != nil { return fmt.Errorf("failed to create batch: %w", err) diff --git a/dsqueue_test.go b/dsqueue_test.go index ba273ac..fd4d492 100644 --- a/dsqueue_test.go +++ b/dsqueue_test.go @@ -132,7 +132,7 @@ func TestInitialization(t *testing.T) { func TestIdleFlush(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := dsqueue.New(ds, dsqName, dsqueue.WithIdleWriteTime(time.Millisecond)) + queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(-1), dsqueue.WithIdleWriteTime(time.Millisecond)) defer queue.Close() cids := random.Cids(10) @@ -246,6 +246,16 @@ func TestDeduplicateCids(t *testing.T) { queue.Enqueue(cids[4].Bytes()) assertOrdered(cids, queue, t) + + // Test with dedup cache disabled. + queue = dsqueue.New(ds, dsqName, dsqueue.WithDedupCacheSize(-1)) + defer queue.Close() + + cids = append(cids, cids[0], cids[0], cids[1]) + for _, c := range cids { + queue.Enqueue(c.Bytes()) + } + assertOrdered(cids, queue, t) } func TestClear(t *testing.T) { @@ -292,3 +302,51 @@ func TestClear(t *testing.T) { case <-time.After(10 * time.Millisecond): } } + +func TestCloseTimeout(t *testing.T) { + ds := sync.MutexWrap(datastore.NewMapDatastore()) + sds := &slowds{ + Batching: ds, + delay: time.Second, + } + queue := dsqueue.New(sds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithCloseTimeout(time.Microsecond)) + defer queue.Close() + + cids := random.Cids(5) + for _, c := range cids { + queue.Enqueue(c.Bytes()) + } + + err := queue.Close() + if err == nil { + t.Fatal("expected error") + } + const expectErr = "5 items not written to datastore" + if err.Error() != expectErr { + t.Fatalf("did not get expected err %q, got %q", expectErr, err.Error()) + } + + // Test with no close timeout. + queue = dsqueue.New(sds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithCloseTimeout(-1)) + defer queue.Close() + + for _, c := range cids { + queue.Enqueue(c.Bytes()) + } + if err = queue.Close(); err != nil { + t.Fatal(err) + } +} + +type slowds struct { + datastore.Batching + delay time.Duration +} + +func (sds *slowds) Put(ctx context.Context, key datastore.Key, value []byte) error { + time.Sleep(sds.delay) + if ctx.Err() != nil { + return ctx.Err() + } + return sds.Batching.Put(ctx, key, value) +}