Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions dsqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"github.com/gammazero/deque"
lru "github.com/hashicorp/golang-lru/v2"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
query "github.com/ipfs/go-datastore/query"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
)

Expand Down Expand Up @@ -44,6 +44,7 @@ type DSQueue struct {
enqueue chan string
clear chan chan<- int
closeTimeout time.Duration
name string
}

// New creates a queue for strings.
Expand All @@ -60,6 +61,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
enqueue: make(chan string),
clear: make(chan chan<- int),
closeTimeout: cfg.closeTimeout,
name: name,
}

go q.worker(ctx, cfg.bufferSize, cfg.dedupCacheSize, cfg.idleWriteTime)
Expand Down Expand Up @@ -97,7 +99,7 @@ func (q *DSQueue) Enqueue(item []byte) (err error) {
}
defer func() {
if r := recover(); r != nil {
err = errors.New("failed to enqueue item: shutting down")
err = fmt.Errorf("failed to enqueue item: %s", r)
}
}()

Expand All @@ -118,6 +120,11 @@ func (q *DSQueue) Clear() int {
return <-rsp
}

// Name returns the name of this DSQueue instance.
func (q *DSQueue) Name() string {
return q.name
}

func makeKey(item string, counter uint64) datastore.Key {
b64Item := base64.RawURLEncoding.EncodeToString([]byte(item))
return datastore.NewKey(fmt.Sprintf("%020d/%s", counter, b64Item))
Expand All @@ -144,16 +151,16 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
defer func() {
if item != "" {
if err := q.ds.Put(ctx, k, nil); err != nil {
log.Errorw("dsqueue: failed to write item to datastore", "err", err)
log.Errorw("failed to write item to datastore", "err", err, "qname", q.name)
}
counter++
}
if inBuf.Len() != 0 {
err := q.commitInput(ctx, counter, &inBuf)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(err)
log.Errorw("error writing items to datastore", "err", err, "qname", q.name)
if inBuf.Len() != 0 {
q.closed <- fmt.Errorf("dsqueue: %d items not written to datastore", inBuf.Len())
q.closed <- fmt.Errorf("%d items not written to datastore", inBuf.Len())
}
}
}
Expand Down Expand Up @@ -183,23 +190,23 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
if !dsEmpty {
head, err := q.getQueueHead(ctx)
if err != nil {
log.Errorw("dsqueue: error querying for head of queue, stopping dsqueue", "err", err)
log.Errorw("error querying for head of queue, stopping dsqueue", "err", err, "qname", q.name)
return
}
if head != nil {
k = datastore.NewKey(head.Key)
if err = q.ds.Delete(ctx, k); err != nil {
log.Errorw("dsqueue: error deleting queue entry, stopping dsqueue", "err", err, "key", head.Key)
log.Errorw("error deleting queue entry, stopping dsqueue", "err", err, "key", head.Key, "qname", q.name)
return
}
parts := strings.SplitN(strings.TrimPrefix(head.Key, "/"), "/", 2)
if len(parts) != 2 {
log.Errorw("dsqueue: malformed queued item, removing it from queue", "err", err, "key", head.Key)
log.Errorw("malformed queued item, removing it from queue", "err", err, "key", head.Key, "qname", q.name)
continue
}
itemBin, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
log.Errorw("dsqueue: error decoding queued item, removing it from queue", "err", err, "key", head.Key)
log.Errorw("error decoding queued item, removing it from queue", "err", err, "key", head.Key, "qname", q.name)
continue
}
item = string(itemBin)
Expand Down Expand Up @@ -281,11 +288,11 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
dedupCache.Purge()
rmDSCount, err := q.clearDatastore(ctx)
if err != nil {
log.Errorw("dsqueue: cannot clear datastore", "err", err)
log.Errorw("cannot clear datastore", "err", err, "qname", q.name)
} else {
dsEmpty = true
}
log.Infow("cleared dsqueue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount)
log.Infow("cleared dsqueue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount, "qname", q.name)
rsp <- rmMemCount + rmDSCount
}

Expand All @@ -295,7 +302,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
err = q.commitInput(ctx, counter, &inBuf)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Errorw("dsqueue: error writing items to datastore, stopping dsqueue", "err", err)
log.Errorw("error writing items to datastore, stopping dsqueue", "err", err, "qname", q.name)
}
return
}
Expand Down Expand Up @@ -380,7 +387,7 @@ func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.
item := items.At(i)
key := makeKey(item, counter)
if err = b.Put(ctx, key, nil); err != nil {
log.Errorw("dsqueue: failed to add item to batch", "err", err)
log.Errorw("failed to add item to batch", "err", err, "qname", q.name)
continue
}
counter++
Expand All @@ -389,7 +396,7 @@ func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.
items.Clear()

if err = b.Commit(ctx); err != nil {
return fmt.Errorf("dsqueue: failed to commit batch to datastore: %w", err)
return fmt.Errorf("failed to commit batch to datastore: %w", err)
}

return nil
Expand Down
77 changes: 64 additions & 13 deletions dsqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-dsqueue"
"github.com/ipfs/go-test/random"
Expand Down Expand Up @@ -48,6 +49,17 @@ func TestBasicOperation(t *testing.T) {
queue := dsqueue.New(ds, dsqName)
defer queue.Close()

if queue.Name() != dsqName {
t.Fatal("wrong queue name")
}

queue.Enqueue(nil)
select {
case <-queue.Dequeue():
t.Fatal("nothing should be in queue")
case <-time.After(time.Millisecond):
}

cids := random.Cids(10)
for _, c := range cids {
queue.Enqueue(c.Bytes())
Expand All @@ -62,6 +74,11 @@ func TestBasicOperation(t *testing.T) {
if err = queue.Close(); err != nil {
t.Fatal(err)
}

err = queue.Enqueue(cids[0].Bytes())
if err == nil {
t.Fatal("expected error calling Enqueue after Close")
}
}

func TestMangledData(t *testing.T) {
Expand Down Expand Up @@ -113,28 +130,62 @@ func TestInitialization(t *testing.T) {
assertOrdered(cids[5:], queue, t)
}

func TestInitializationWithManyCids(t *testing.T) {
func TestIdleFlush(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(25))
queue := dsqueue.New(ds, dsqName, dsqueue.WithIdleWriteTime(time.Millisecond))
defer queue.Close()

cids := random.Cids(100)
cids := random.Cids(10)
for _, c := range cids {
queue.Enqueue(c.Bytes())
}

/*
err := queue.Close()
if err != nil {
t.Fatal(err)
}
dsn := namespace.Wrap(ds, datastore.NewKey("/dsq-"+dsqName))
time.Sleep(10 * time.Millisecond)

ctx := context.Background()
n, err := countItems(ctx, dsn)
if err != nil {
t.Fatal(err)
}
if n != 0 {
t.Fatal("expected nothing in datastore")
}

// make a new queue, same data
queue = dsqueue.New(ds, dsqName)
defer queue.Close()
*/
time.Sleep(2 * time.Second)

assertOrdered(cids, queue, t)
n, err = countItems(ctx, dsn)
if err != nil {
t.Fatal(err)
}
expect := len(cids) - 1
if n != expect {
t.Fatalf("should have flushed %d cids to datastore, got %d", expect, n)
}
}

func countItems(ctx context.Context, ds datastore.Datastore) (int, error) {
qry := query.Query{
KeysOnly: true,
}
results, err := ds.Query(ctx, qry)
if err != nil {
return 0, fmt.Errorf("cannot query datastore: %w", err)
}
defer results.Close()

var count int
for result := range results.Next() {
if ctx.Err() != nil {
return 0, ctx.Err()
}
if result.Error != nil {
return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error)
}
count++
}

return count, nil
}

func TestPersistManyCids(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func WithDedupCacheSize(n int) Option {
// WithIdleWriteTime sets the amout of time that the queue must be idle (no
// input or output) before all buffered input items are written to the
// datastore. A value of zero means that buffered input items are not
// automatically flushed to the datastore. This value must be greater than one
// second.
// automatically flushed to the datastore. A non-zero value must be greater
// than one second.
func WithIdleWriteTime(d time.Duration) Option {
return func(c *config) {
if d < time.Second {
if d != 0 && d < time.Second {
d = time.Second
}
c.idleWriteTime = d
Expand Down