Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dsqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
if err := q.ds.Put(ctx, k, nil); err != nil {
log.Errorw("failed to write item to datastore", "err", err, "qname", q.name)
}
counter++
}
if inBuf.Len() != 0 {
err := q.commitInput(ctx, counter, &inBuf)
Expand Down Expand Up @@ -219,6 +218,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
// the input buffer.
item = inBuf.PopFront()
k = makeKey(item, counter)
counter++
}
}

Expand Down
5 changes: 3 additions & 2 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func getOpts(opts []Option) config {

// WithBufferSize sets the limit on number of items kept in input buffer
// memory, at which they are all written to the datastore. A value of 0 means
// items are only written to the datastore at shutdown, and read from the
// datastore at start.
// the buffer size is unlimited, and items are only written to the datastore
// when the queue has been idle more then the idle write time or when the queue
// is closed.
func WithBufferSize(n int) Option {
return func(c *config) {
if n < 0 {
Expand Down