Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,35 @@ The dsqueue package provides a buffered FIFO queue backed by a [Batching Datasto

https://pkg.go.dev/github.com/ipfs/go-dsqueue

## Example

```go
ds = getBatchingDatastore()
dsq := dsqueue.New(ds, "ExampleQueue")
defer dsq.Close()

c, err := cid.Decode("QmPNHBy5h7f19yJDt7ip9TvmMRbqmYsa6aetkrsc1ghjLB")
if err != nil {
panic(err)
}

dsq.Put(c.Bytes())

out := <-dsq.Out()
c2, err := cid.Parse(out)
if err != nil {
panic(err)
}

if c2 != c {
fmt.Fprintln(os.Stderr, "cids are not quual")
}
```

## Lead Maintainer

[@gammazero](

## Contributing

Contributions are welcome! This repository is part of the IPFS project and therefore governed by our [contributing guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md).
Expand Down
8 changes: 4 additions & 4 deletions dsqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (q *DSQueue) Close() error {
return err
}

// Enqueue puts an item in the queue.
func (q *DSQueue) Enqueue(item []byte) (err error) {
// Put puts an item into the queue.
func (q *DSQueue) Put(item []byte) (err error) {
if len(item) == 0 {
return
}
Expand All @@ -107,8 +107,8 @@ func (q *DSQueue) Enqueue(item []byte) (err error) {
return
}

// Dequeue returns a channel that for reading entries from the queue,
func (q *DSQueue) Dequeue() <-chan []byte {
// Out returns a channel that for reading entries from the queue,
func (q *DSQueue) Out() <-chan []byte {
return q.dequeue
}

Expand Down
79 changes: 51 additions & 28 deletions dsqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func assertOrdered(cids []cid.Cid, q *dsqueue.DSQueue, t *testing.T) {
var count int
for i, c := range cids {
select {
case dequeued, ok := <-q.Dequeue():
case dequeued, ok := <-q.Out():
if !ok {
t.Fatal("queue closed")
}
Expand Down Expand Up @@ -53,29 +53,52 @@ func TestBasicOperation(t *testing.T) {
t.Fatal("wrong queue name")
}

queue.Enqueue(nil)
queue.Put(nil)
select {
case <-queue.Dequeue():
case <-queue.Out():
t.Fatal("nothing should be in queue")
case <-time.After(time.Millisecond):
}

cids := random.Cids(10)
for _, c := range cids {
queue.Enqueue(c.Bytes())
}
out := make(chan []string)
go func() {
var outStrs []string
for {
select {
case dq, open := <-queue.Out():
if !open {
out <- outStrs
return
}
dqItem := string(dq)
t.Log("got:", dqItem)
outStrs = append(outStrs, dqItem)
}
}
}()

assertOrdered(cids, queue, t)
items := []string{"apple", "banana", "cherry"}
for _, item := range items {
queue.Put([]byte(item))
}

time.Sleep(time.Second)
err := queue.Close()
if err != nil {
t.Fatal(err)
}

qout := <-out

if len(qout) != len(items) {
t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(items), len(qout))
}

if err = queue.Close(); err != nil {
t.Fatal(err)
}

err = queue.Enqueue(cids[0].Bytes())
err = queue.Put([]byte(items[0]))
if err == nil {
t.Fatal("expected error calling Enqueue after Close")
}
Expand All @@ -98,7 +121,7 @@ func TestMangledData(t *testing.T) {

cids := random.Cids(10)
for _, c := range cids {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}

// expect to only see the valid cids we entered
Expand All @@ -113,7 +136,7 @@ func TestInitialization(t *testing.T) {

cids := random.Cids(10)
for _, c := range cids {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}

assertOrdered(cids[:5], queue, t)
Expand All @@ -137,7 +160,7 @@ func TestIdleFlush(t *testing.T) {

cids := random.Cids(10)
for _, c := range cids {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}

dsn := namespace.Wrap(ds, datastore.NewKey("/dsq-"+dsqName))
Expand Down Expand Up @@ -195,7 +218,7 @@ func TestPersistManyCids(t *testing.T) {

cids := random.Cids(25)
for _, c := range cids {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}

err := queue.Close()
Expand All @@ -216,7 +239,7 @@ func TestPersistOneCid(t *testing.T) {
defer queue.Close()

cids := random.Cids(1)
queue.Enqueue(cids[0].Bytes())
queue.Put(cids[0].Bytes())

err := queue.Close()
if err != nil {
Expand All @@ -236,14 +259,14 @@ func TestDeduplicateCids(t *testing.T) {
defer queue.Close()

cids := random.Cids(5)
queue.Enqueue(cids[0].Bytes())
queue.Enqueue(cids[0].Bytes())
queue.Enqueue(cids[1].Bytes())
queue.Enqueue(cids[2].Bytes())
queue.Enqueue(cids[1].Bytes())
queue.Enqueue(cids[3].Bytes())
queue.Enqueue(cids[0].Bytes())
queue.Enqueue(cids[4].Bytes())
queue.Put(cids[0].Bytes())
queue.Put(cids[0].Bytes())
queue.Put(cids[1].Bytes())
queue.Put(cids[2].Bytes())
queue.Put(cids[1].Bytes())
queue.Put(cids[3].Bytes())
queue.Put(cids[0].Bytes())
queue.Put(cids[4].Bytes())

assertOrdered(cids, queue, t)

Expand All @@ -253,7 +276,7 @@ func TestDeduplicateCids(t *testing.T) {

cids = append(cids, cids[0], cids[0], cids[1])
for _, c := range cids {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}
assertOrdered(cids, queue, t)
}
Expand All @@ -266,7 +289,7 @@ func TestClear(t *testing.T) {
defer queue.Close()

for _, c := range random.Cids(cidCount) {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}

// Cause queued entried to be saved in datastore.
Expand All @@ -279,7 +302,7 @@ func TestClear(t *testing.T) {
defer queue.Close()

for _, c := range random.Cids(cidCount) {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}

rmCount := queue.Clear()
Expand All @@ -297,7 +320,7 @@ func TestClear(t *testing.T) {
defer queue.Close()

select {
case <-queue.Dequeue():
case <-queue.Out():
t.Fatal("dequeue should not return")
case <-time.After(10 * time.Millisecond):
}
Expand All @@ -314,7 +337,7 @@ func TestCloseTimeout(t *testing.T) {

cids := random.Cids(5)
for _, c := range cids {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}

err := queue.Close()
Expand All @@ -331,7 +354,7 @@ func TestCloseTimeout(t *testing.T) {
defer queue.Close()

for _, c := range cids {
queue.Enqueue(c.Bytes())
queue.Put(c.Bytes())
}
if err = queue.Close(); err != nil {
t.Fatal(err)
Expand Down