Skip to content

Commit

Permalink
Add option to never commit
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma committed Jan 11, 2019
1 parent d288fa0 commit 21e3953
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion channel/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type Sink struct {
}

// NewSink creates a new channel Sink.
//
// A batch size of 0 will never commit.
func NewSink(ch chan *streams.Message, batch int) *Sink {
return &Sink{
ch: ch,
Expand All @@ -30,7 +32,7 @@ func (s *Sink) Process(msg *streams.Message) error {
s.ch <- msg

s.count++
if s.count >= s.batch {
if s.batch > 0 && s.count >= s.batch {
s.count = 0
return s.pipe.Commit(msg)
}
Expand Down

0 comments on commit 21e3953

Please sign in to comment.