From 21e39532ffb8b72c15fe06a1954a157f923253e3 Mon Sep 17 00:00:00 2001 From: Nicholas Wiersma Date: Fri, 11 Jan 2019 12:05:24 +0200 Subject: [PATCH] Add option to never commit --- channel/sink.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/channel/sink.go b/channel/sink.go index 0066e5d..7472aab 100644 --- a/channel/sink.go +++ b/channel/sink.go @@ -13,6 +13,8 @@ type Sink struct { } // NewSink creates a new channel Sink. +// +// A batch size of 0 will never commit. func NewSink(ch chan *streams.Message, batch int) *Sink { return &Sink{ ch: ch, @@ -30,7 +32,7 @@ func (s *Sink) Process(msg *streams.Message) error { s.ch <- msg s.count++ - if s.count >= s.batch { + if s.batch > 0 && s.count >= s.batch { s.count = 0 return s.pipe.Commit(msg) }