From d433f9b5f39aab73a47cdc2236b46cfbe462e382 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 10 Sep 2025 18:53:17 -1000 Subject: [PATCH 1/2] put queue name into log messages --- dsqueue.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/dsqueue.go b/dsqueue.go index 1b0c9a6..e14d195 100644 --- a/dsqueue.go +++ b/dsqueue.go @@ -44,6 +44,7 @@ type DSQueue struct { enqueue chan string clear chan chan<- int closeTimeout time.Duration + name string } // New creates a queue for strings. @@ -60,6 +61,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue { enqueue: make(chan string), clear: make(chan chan<- int), closeTimeout: cfg.closeTimeout, + name: name, } go q.worker(ctx, cfg.bufferSize, cfg.dedupCacheSize, cfg.idleWriteTime) @@ -118,6 +120,11 @@ func (q *DSQueue) Clear() int { return <-rsp } +// Name returns the name of this DSQueue instance. +func (q *DSQueue) Name() string { + return q.name +} + func makeKey(item string, counter uint64) datastore.Key { b64Item := base64.RawURLEncoding.EncodeToString([]byte(item)) return datastore.NewKey(fmt.Sprintf("%020d/%s", counter, b64Item)) @@ -144,16 +151,16 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id defer func() { if item != "" { if err := q.ds.Put(ctx, k, nil); err != nil { - log.Errorw("dsqueue: failed to write item to datastore", "err", err) + log.Errorw("failed to write item to datastore", "err", err, "qname", q.name) } counter++ } if inBuf.Len() != 0 { err := q.commitInput(ctx, counter, &inBuf) if err != nil && !errors.Is(err, context.Canceled) { - log.Error(err) + log.Errorw("error writing items to datastore", "err", err, "qname", q.name) if inBuf.Len() != 0 { - q.closed <- fmt.Errorf("dsqueue: %d items not written to datastore", inBuf.Len()) + q.closed <- fmt.Errorf("%d items not written to datastore", inBuf.Len()) } } } @@ -183,23 +190,23 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id if !dsEmpty { head, err := q.getQueueHead(ctx) if err != nil { - log.Errorw("dsqueue: error querying for head of queue, stopping dsqueue", "err", err) + log.Errorw("error querying for head of queue, stopping dsqueue", "err", err, "qname", q.name) return } if head != nil { k = datastore.NewKey(head.Key) if err = q.ds.Delete(ctx, k); err != nil { - log.Errorw("dsqueue: error deleting queue entry, stopping dsqueue", "err", err, "key", head.Key) + log.Errorw("error deleting queue entry, stopping dsqueue", "err", err, "key", head.Key, "qname", q.name) return } parts := strings.SplitN(strings.TrimPrefix(head.Key, "/"), "/", 2) if len(parts) != 2 { - log.Errorw("dsqueue: malformed queued item, removing it from queue", "err", err, "key", head.Key) + log.Errorw("malformed queued item, removing it from queue", "err", err, "key", head.Key, "qname", q.name) continue } itemBin, err := base64.RawURLEncoding.DecodeString(parts[1]) if err != nil { - log.Errorw("dsqueue: error decoding queued item, removing it from queue", "err", err, "key", head.Key) + log.Errorw("error decoding queued item, removing it from queue", "err", err, "key", head.Key, "qname", q.name) continue } item = string(itemBin) @@ -281,11 +288,11 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id dedupCache.Purge() rmDSCount, err := q.clearDatastore(ctx) if err != nil { - log.Errorw("dsqueue: cannot clear datastore", "err", err) + log.Errorw("cannot clear datastore", "err", err, "qname", q.name) } else { dsEmpty = true } - log.Infow("cleared dsqueue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount) + log.Infow("cleared dsqueue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount, "qname", q.name) rsp <- rmMemCount + rmDSCount } @@ -295,7 +302,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id err = q.commitInput(ctx, counter, &inBuf) if err != nil { if !errors.Is(err, context.Canceled) { - log.Errorw("dsqueue: error writing items to datastore, stopping dsqueue", "err", err) + log.Errorw("error writing items to datastore, stopping dsqueue", "err", err, "qname", q.name) } return } @@ -380,7 +387,7 @@ func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque. item := items.At(i) key := makeKey(item, counter) if err = b.Put(ctx, key, nil); err != nil { - log.Errorw("dsqueue: failed to add item to batch", "err", err) + log.Errorw("failed to add item to batch", "err", err, "qname", q.name) continue } counter++ @@ -389,7 +396,7 @@ func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque. items.Clear() if err = b.Commit(ctx); err != nil { - return fmt.Errorf("dsqueue: failed to commit batch to datastore: %w", err) + return fmt.Errorf("failed to commit batch to datastore: %w", err) } return nil From 25cd7811ff5ffa2d33ef181af2bdb82d0c4d74c3 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 10 Sep 2025 19:35:28 -1000 Subject: [PATCH 2/2] add tests --- dsqueue.go | 8 ++--- dsqueue_test.go | 77 ++++++++++++++++++++++++++++++++++++++++--------- option.go | 6 ++-- 3 files changed, 71 insertions(+), 20 deletions(-) diff --git a/dsqueue.go b/dsqueue.go index e14d195..175436b 100644 --- a/dsqueue.go +++ b/dsqueue.go @@ -11,9 +11,9 @@ import ( "github.com/gammazero/deque" lru "github.com/hashicorp/golang-lru/v2" - datastore "github.com/ipfs/go-datastore" - namespace "github.com/ipfs/go-datastore/namespace" - query "github.com/ipfs/go-datastore/query" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log/v2" ) @@ -99,7 +99,7 @@ func (q *DSQueue) Enqueue(item []byte) (err error) { } defer func() { if r := recover(); r != nil { - err = errors.New("failed to enqueue item: shutting down") + err = fmt.Errorf("failed to enqueue item: %s", r) } }() diff --git a/dsqueue_test.go b/dsqueue_test.go index ff79de9..ba273ac 100644 --- a/dsqueue_test.go +++ b/dsqueue_test.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-dsqueue" "github.com/ipfs/go-test/random" @@ -48,6 +49,17 @@ func TestBasicOperation(t *testing.T) { queue := dsqueue.New(ds, dsqName) defer queue.Close() + if queue.Name() != dsqName { + t.Fatal("wrong queue name") + } + + queue.Enqueue(nil) + select { + case <-queue.Dequeue(): + t.Fatal("nothing should be in queue") + case <-time.After(time.Millisecond): + } + cids := random.Cids(10) for _, c := range cids { queue.Enqueue(c.Bytes()) @@ -62,6 +74,11 @@ func TestBasicOperation(t *testing.T) { if err = queue.Close(); err != nil { t.Fatal(err) } + + err = queue.Enqueue(cids[0].Bytes()) + if err == nil { + t.Fatal("expected error calling Enqueue after Close") + } } func TestMangledData(t *testing.T) { @@ -113,28 +130,62 @@ func TestInitialization(t *testing.T) { assertOrdered(cids[5:], queue, t) } -func TestInitializationWithManyCids(t *testing.T) { +func TestIdleFlush(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(25)) + queue := dsqueue.New(ds, dsqName, dsqueue.WithIdleWriteTime(time.Millisecond)) defer queue.Close() - cids := random.Cids(100) + cids := random.Cids(10) for _, c := range cids { queue.Enqueue(c.Bytes()) } - /* - err := queue.Close() - if err != nil { - t.Fatal(err) - } + dsn := namespace.Wrap(ds, datastore.NewKey("/dsq-"+dsqName)) + time.Sleep(10 * time.Millisecond) + + ctx := context.Background() + n, err := countItems(ctx, dsn) + if err != nil { + t.Fatal(err) + } + if n != 0 { + t.Fatal("expected nothing in datastore") + } - // make a new queue, same data - queue = dsqueue.New(ds, dsqName) - defer queue.Close() - */ + time.Sleep(2 * time.Second) - assertOrdered(cids, queue, t) + n, err = countItems(ctx, dsn) + if err != nil { + t.Fatal(err) + } + expect := len(cids) - 1 + if n != expect { + t.Fatalf("should have flushed %d cids to datastore, got %d", expect, n) + } +} + +func countItems(ctx context.Context, ds datastore.Datastore) (int, error) { + qry := query.Query{ + KeysOnly: true, + } + results, err := ds.Query(ctx, qry) + if err != nil { + return 0, fmt.Errorf("cannot query datastore: %w", err) + } + defer results.Close() + + var count int + for result := range results.Next() { + if ctx.Err() != nil { + return 0, ctx.Err() + } + if result.Error != nil { + return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error) + } + count++ + } + + return count, nil } func TestPersistManyCids(t *testing.T) { diff --git a/option.go b/option.go index 76e6ee2..896bb39 100644 --- a/option.go +++ b/option.go @@ -64,11 +64,11 @@ func WithDedupCacheSize(n int) Option { // WithIdleWriteTime sets the amout of time that the queue must be idle (no // input or output) before all buffered input items are written to the // datastore. A value of zero means that buffered input items are not -// automatically flushed to the datastore. This value must be greater than one -// second. +// automatically flushed to the datastore. A non-zero value must be greater +// than one second. func WithIdleWriteTime(d time.Duration) Option { return func(c *config) { - if d < time.Second { + if d != 0 && d < time.Second { d = time.Second } c.idleWriteTime = d