Skip to content

Commit

Permalink
adding batcher mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Mzack9999 committed May 25, 2023
1 parent 6b25880 commit 517775e
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 0 deletions.
116 changes: 116 additions & 0 deletions batcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package batcher

import (
"time"
)

// FlushCallback is the callback function that will be called when the batcher is full or the flush interval is reached
type FlushCallback[T any] func([]T)

type Batcher[T any] struct {
maxCapacity int
flushInterval time.Duration

incomingData chan T
full chan bool
mustExit chan bool
done chan bool
flushCallback FlushCallback[T]
}

// New creates a new batcher
func New[T any](maxCapacity int, flushInterval time.Duration, fn FlushCallback[T]) *Batcher[T] {
batcher := &Batcher[T]{
maxCapacity: maxCapacity,
incomingData: make(chan T, maxCapacity),
full: make(chan bool),
flushInterval: flushInterval,
mustExit: make(chan bool, 1),
done: make(chan bool, 1),
flushCallback: fn,
}
return batcher
}

// Append appends data to the batcher
func (b *Batcher[T]) Append(d ...T) {
for _, item := range d {
if !b.put(item) {
// will wait until space available
b.full <- true
b.incomingData <- item
}
}
}

func (b *Batcher[T]) put(d T) bool {
// try to append the data
select {
case b.incomingData <- d:
return true
default:
// channel is full
return false
}
}

func (b *Batcher[T]) run() {
// consume all items in the queue
defer func() {
b.doCallback()
close(b.done)
}()

timer := time.NewTimer(b.flushInterval)
for {
select {
case <-timer.C:
b.doCallback()
timer.Reset(b.flushInterval)
case <-b.full:
if !timer.Stop() {
<-timer.C
}
b.doCallback()
timer.Reset(b.flushInterval)
case <-b.mustExit:
if !timer.Stop() {
<-timer.C
}
return
}
}
}

func (b *Batcher[T]) doCallback() {
n := len(b.incomingData)
if n == 0 {
return
}
items := make([]T, n)

var k int
for item := range b.incomingData {
items[k] = item
k++
if k >= n {
break
}
}
b.flushCallback(items)
}

// Run starts the batcher
func (b *Batcher[T]) Run() {
go b.run()
}

// Stop stops the batcher
func (b *Batcher[T]) Stop() {
b.mustExit <- true
}

// WaitDone waits until the batcher is done
func (b *Batcher[T]) WaitDone() {
<-b.done
}
37 changes: 37 additions & 0 deletions batcher/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package batcher

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestBatcher(t *testing.T) {
var (
batchSize = 100
wanted = 100000
minWantedBatches = wanted / batchSize
got int
gotBatches int
)
bat := New(batchSize, time.Second, func(t []int) {
gotBatches++
for range t {
got++
}
})

bat.Run()

for i := 0; i < wanted; i++ {
bat.Append(i)
}

bat.Stop()

bat.WaitDone()

require.Equal(t, wanted, got)
require.True(t, minWantedBatches < gotBatches)
}
4 changes: 4 additions & 0 deletions batcher/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// batcher is a package that provides a simple batching mechanism
// the buffer can be configured with a max capacity and a flush interval
// the buffer will invoke a callback function when the buffer is full or the flush interval is reached
package batcher

0 comments on commit 517775e

Please sign in to comment.