Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions dsqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var log = logging.Logger("dsqueue")
// then queued items can remain in memory. When the queue is closed, any
// remaining items in memory are written to the datastore.
type DSQueue struct {
close context.CancelFunc
cancel context.CancelFunc
closed chan error
closeOnce sync.Once
dequeue chan []byte
Expand All @@ -54,7 +54,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
ctx, cancel := context.WithCancel(context.Background())

q := &DSQueue{
close: cancel,
cancel: cancel,
closed: make(chan error, 1),
dequeue: make(chan []byte),
ds: namespace.Wrap(ds, datastore.NewKey("/dsq-"+name)),
Expand Down Expand Up @@ -84,7 +84,7 @@ func (q *DSQueue) Close() error {
select {
case <-q.closed:
case <-timeoutCh:
q.close() // force immediate shutdown
q.cancel() // force immediate shutdown
err = <-q.closed
}
close(q.dequeue) // no more output from this queue
Expand Down Expand Up @@ -150,21 +150,30 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id

defer func() {
if item != "" {
// Write the item directly, instead of pushing it to the front of
// inbuf, in order to retain it's original kay, and therefore the
// order in the datastore, which may not be empty.
if err := q.ds.Put(ctx, k, nil); err != nil {
log.Errorw("failed to write item to datastore", "err", err, "qname", q.name)
if !errors.Is(err, context.Canceled) {
log.Errorw("failed to write item to datastore", "err", err, "qname", q.name)
}
q.closed <- fmt.Errorf("%d items not written to datastore", 1+inBuf.Len())
return
}
}
if inBuf.Len() != 0 {
err := q.commitInput(ctx, counter, &inBuf)
if err != nil && !errors.Is(err, context.Canceled) {
log.Errorw("error writing items to datastore", "err", err, "qname", q.name)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Errorw("error writing items to datastore", "err", err, "qname", q.name)
}
if inBuf.Len() != 0 {
q.closed <- fmt.Errorf("%d items not written to datastore", inBuf.Len())
}
}
}
if err := q.ds.Sync(ctx, datastore.NewKey("")); err != nil {
q.closed <- fmt.Errorf("cannot sync datastore: %w", err)
log.Errorw("failed to sync datastore", "err", err, "qname", q.name)
}
}()

Expand Down Expand Up @@ -378,6 +387,10 @@ func (q *DSQueue) getQueueHead(ctx context.Context) (*query.Entry, error) {
}

func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.Deque[string]) error {
if ctx.Err() != nil {
return ctx.Err()
}

b, err := q.ds.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create batch: %w", err)
Expand Down
60 changes: 59 additions & 1 deletion dsqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestInitialization(t *testing.T) {

func TestIdleFlush(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue := dsqueue.New(ds, dsqName, dsqueue.WithIdleWriteTime(time.Millisecond))
queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(-1), dsqueue.WithIdleWriteTime(time.Millisecond))
defer queue.Close()

cids := random.Cids(10)
Expand Down Expand Up @@ -246,6 +246,16 @@ func TestDeduplicateCids(t *testing.T) {
queue.Enqueue(cids[4].Bytes())

assertOrdered(cids, queue, t)

// Test with dedup cache disabled.
queue = dsqueue.New(ds, dsqName, dsqueue.WithDedupCacheSize(-1))
defer queue.Close()

cids = append(cids, cids[0], cids[0], cids[1])
for _, c := range cids {
queue.Enqueue(c.Bytes())
}
assertOrdered(cids, queue, t)
}

func TestClear(t *testing.T) {
Expand Down Expand Up @@ -292,3 +302,51 @@ func TestClear(t *testing.T) {
case <-time.After(10 * time.Millisecond):
}
}

func TestCloseTimeout(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
sds := &slowds{
Batching: ds,
delay: time.Second,
}
queue := dsqueue.New(sds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithCloseTimeout(time.Microsecond))
defer queue.Close()

cids := random.Cids(5)
for _, c := range cids {
queue.Enqueue(c.Bytes())
}

err := queue.Close()
if err == nil {
t.Fatal("expected error")
}
const expectErr = "5 items not written to datastore"
if err.Error() != expectErr {
t.Fatalf("did not get expected err %q, got %q", expectErr, err.Error())
}

// Test with no close timeout.
queue = dsqueue.New(sds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithCloseTimeout(-1))
defer queue.Close()

for _, c := range cids {
queue.Enqueue(c.Bytes())
}
if err = queue.Close(); err != nil {
t.Fatal(err)
}
}

type slowds struct {
datastore.Batching
delay time.Duration
}

func (sds *slowds) Put(ctx context.Context, key datastore.Key, value []byte) error {
time.Sleep(sds.delay)
if ctx.Err() != nil {
return ctx.Err()
}
return sds.Batching.Put(ctx, key, value)
}