diff --git a/README.md b/README.md index 1d464fc..d46bdfe 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,35 @@ The dsqueue package provides a buffered FIFO queue backed by a [Batching Datasto https://pkg.go.dev/github.com/ipfs/go-dsqueue +## Example + +```go +ds = getBatchingDatastore() +dsq := dsqueue.New(ds, "ExampleQueue") +defer dsq.Close() + +c, err := cid.Decode("QmPNHBy5h7f19yJDt7ip9TvmMRbqmYsa6aetkrsc1ghjLB") +if err != nil { + panic(err) +} + +dsq.Put(c.Bytes()) + +out := <-dsq.Out() +c2, err := cid.Parse(out) +if err != nil { + panic(err) +} + +if c2 != c { + fmt.Fprintln(os.Stderr, "cids are not quual") +} +``` + +## Lead Maintainer + +[@gammazero]( + ## Contributing Contributions are welcome! This repository is part of the IPFS project and therefore governed by our [contributing guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md). diff --git a/dsqueue.go b/dsqueue.go index e031e2a..abb8f3b 100644 --- a/dsqueue.go +++ b/dsqueue.go @@ -92,8 +92,8 @@ func (q *DSQueue) Close() error { return err } -// Enqueue puts an item in the queue. -func (q *DSQueue) Enqueue(item []byte) (err error) { +// Put puts an item into the queue. +func (q *DSQueue) Put(item []byte) (err error) { if len(item) == 0 { return } @@ -107,8 +107,8 @@ func (q *DSQueue) Enqueue(item []byte) (err error) { return } -// Dequeue returns a channel that for reading entries from the queue, -func (q *DSQueue) Dequeue() <-chan []byte { +// Out returns a channel that for reading entries from the queue, +func (q *DSQueue) Out() <-chan []byte { return q.dequeue } diff --git a/dsqueue_test.go b/dsqueue_test.go index fd4d492..f768ad8 100644 --- a/dsqueue_test.go +++ b/dsqueue_test.go @@ -24,7 +24,7 @@ func assertOrdered(cids []cid.Cid, q *dsqueue.DSQueue, t *testing.T) { var count int for i, c := range cids { select { - case dequeued, ok := <-q.Dequeue(): + case dequeued, ok := <-q.Out(): if !ok { t.Fatal("queue closed") } @@ -53,29 +53,52 @@ func TestBasicOperation(t *testing.T) { t.Fatal("wrong queue name") } - queue.Enqueue(nil) + queue.Put(nil) select { - case <-queue.Dequeue(): + case <-queue.Out(): t.Fatal("nothing should be in queue") case <-time.After(time.Millisecond): } - cids := random.Cids(10) - for _, c := range cids { - queue.Enqueue(c.Bytes()) - } + out := make(chan []string) + go func() { + var outStrs []string + for { + select { + case dq, open := <-queue.Out(): + if !open { + out <- outStrs + return + } + dqItem := string(dq) + t.Log("got:", dqItem) + outStrs = append(outStrs, dqItem) + } + } + }() - assertOrdered(cids, queue, t) + items := []string{"apple", "banana", "cherry"} + for _, item := range items { + queue.Put([]byte(item)) + } + time.Sleep(time.Second) err := queue.Close() if err != nil { t.Fatal(err) } + + qout := <-out + + if len(qout) != len(items) { + t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(items), len(qout)) + } + if err = queue.Close(); err != nil { t.Fatal(err) } - err = queue.Enqueue(cids[0].Bytes()) + err = queue.Put([]byte(items[0])) if err == nil { t.Fatal("expected error calling Enqueue after Close") } @@ -98,7 +121,7 @@ func TestMangledData(t *testing.T) { cids := random.Cids(10) for _, c := range cids { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } // expect to only see the valid cids we entered @@ -113,7 +136,7 @@ func TestInitialization(t *testing.T) { cids := random.Cids(10) for _, c := range cids { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } assertOrdered(cids[:5], queue, t) @@ -137,7 +160,7 @@ func TestIdleFlush(t *testing.T) { cids := random.Cids(10) for _, c := range cids { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } dsn := namespace.Wrap(ds, datastore.NewKey("/dsq-"+dsqName)) @@ -195,7 +218,7 @@ func TestPersistManyCids(t *testing.T) { cids := random.Cids(25) for _, c := range cids { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } err := queue.Close() @@ -216,7 +239,7 @@ func TestPersistOneCid(t *testing.T) { defer queue.Close() cids := random.Cids(1) - queue.Enqueue(cids[0].Bytes()) + queue.Put(cids[0].Bytes()) err := queue.Close() if err != nil { @@ -236,14 +259,14 @@ func TestDeduplicateCids(t *testing.T) { defer queue.Close() cids := random.Cids(5) - queue.Enqueue(cids[0].Bytes()) - queue.Enqueue(cids[0].Bytes()) - queue.Enqueue(cids[1].Bytes()) - queue.Enqueue(cids[2].Bytes()) - queue.Enqueue(cids[1].Bytes()) - queue.Enqueue(cids[3].Bytes()) - queue.Enqueue(cids[0].Bytes()) - queue.Enqueue(cids[4].Bytes()) + queue.Put(cids[0].Bytes()) + queue.Put(cids[0].Bytes()) + queue.Put(cids[1].Bytes()) + queue.Put(cids[2].Bytes()) + queue.Put(cids[1].Bytes()) + queue.Put(cids[3].Bytes()) + queue.Put(cids[0].Bytes()) + queue.Put(cids[4].Bytes()) assertOrdered(cids, queue, t) @@ -253,7 +276,7 @@ func TestDeduplicateCids(t *testing.T) { cids = append(cids, cids[0], cids[0], cids[1]) for _, c := range cids { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } assertOrdered(cids, queue, t) } @@ -266,7 +289,7 @@ func TestClear(t *testing.T) { defer queue.Close() for _, c := range random.Cids(cidCount) { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } // Cause queued entried to be saved in datastore. @@ -279,7 +302,7 @@ func TestClear(t *testing.T) { defer queue.Close() for _, c := range random.Cids(cidCount) { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } rmCount := queue.Clear() @@ -297,7 +320,7 @@ func TestClear(t *testing.T) { defer queue.Close() select { - case <-queue.Dequeue(): + case <-queue.Out(): t.Fatal("dequeue should not return") case <-time.After(10 * time.Millisecond): } @@ -314,7 +337,7 @@ func TestCloseTimeout(t *testing.T) { cids := random.Cids(5) for _, c := range cids { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } err := queue.Close() @@ -331,7 +354,7 @@ func TestCloseTimeout(t *testing.T) { defer queue.Close() for _, c := range cids { - queue.Enqueue(c.Bytes()) + queue.Put(c.Bytes()) } if err = queue.Close(); err != nil { t.Fatal(err)