From c11a4d51792f726084248e73bd64e6d862a5c8bb Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Wed, 22 May 2024 14:13:44 +0100 Subject: [PATCH] Add batcher method for owned output --- public/service/output.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/public/service/output.go b/public/service/output.go index 72fa7d989c..d6c259dbe2 100644 --- a/public/service/output.go +++ b/public/service/output.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/benthosdev/benthos/v4/internal/component/output" + "github.com/benthosdev/benthos/v4/internal/component/output/batcher" "github.com/benthosdev/benthos/v4/internal/message" ) @@ -184,6 +185,14 @@ func newOwnedOutput(o output.Streamed) (*OwnedOutput, error) { }, nil } +// BatchedWith returns a copy of the OwnedOutput where messages will be batched +// according to the provided batcher. +func (o *OwnedOutput) BatchedWith(b *Batcher) *OwnedOutput { + return &OwnedOutput{ + o: batcher.New(b.p, o.o, b.mgr), + } +} + // Prime attempts to establish the output connection ready for consuming data. // This is done automatically once data is written. However, pre-emptively // priming the connection before data is received is generally a better idea for