Skip to content

Commit

Permalink
Release/v1.1.4 (#11)
Browse files Browse the repository at this point in the history
* fixes and updated deps

* updated benchmarks

* fixes and updated deps

---------

Co-authored-by: Nikita <nikita@novastar.su>
  • Loading branch information
nar10z and Nikita committed May 11, 2024
1 parent 833c59d commit 3d6b9a4
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 91 deletions.
88 changes: 44 additions & 44 deletions _benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,30 @@ go test -bench=. -test.benchmem -test.benchtime=1000000x -test.count=5
goos: darwin
goarch: arm64
Benchmark_accum/go-accumulator,_async
Benchmark_accum/go-accumulator,_async-8 1000000 159.7 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_async-8 1000000 152.2 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_async-8 1000000 153.4 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_async-8 1000000 153.5 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_async-8 1000000 85.99 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async
Benchmark_accum/go-accumulator_async-8 1000000 110.1 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async-8 1000000 176.3 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async-8 1000000 161.3 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async-8 1000000 130.6 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async-8 1000000 123.9 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator
Benchmark_accum/lrweck/accumulator-8 1000000 141.6 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 142.4 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 144.2 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 130.5 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 154.5 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_sync
Benchmark_accum/go-accumulator,_sync-8 1000000 982.0 ns/op 195 B/op 3 allocs/op
Benchmark_accum/go-accumulator,_sync-8 1000000 896.2 ns/op 186 B/op 3 allocs/op
Benchmark_accum/go-accumulator,_sync-8 1000000 899.6 ns/op 188 B/op 3 allocs/op
Benchmark_accum/go-accumulator,_sync-8 1000000 848.5 ns/op 188 B/op 3 allocs/op
Benchmark_accum/go-accumulator,_sync-8 1000000 838.5 ns/op 185 B/op 3 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 183.8 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 188.3 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 185.2 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 187.2 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-8 1000000 187.4 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_sync
Benchmark_accum/go-accumulator_sync-8 1000000 883.1 ns/op 188 B/op 3 allocs/op
Benchmark_accum/go-accumulator_sync-8 1000000 829.5 ns/op 186 B/op 3 allocs/op
Benchmark_accum/go-accumulator_sync-8 1000000 905.8 ns/op 187 B/op 3 allocs/op
Benchmark_accum/go-accumulator_sync-8 1000000 857.7 ns/op 182 B/op 3 allocs/op
Benchmark_accum/go-accumulator_sync-8 1000000 922.9 ns/op 186 B/op 3 allocs/op
Benchmark_accum/go-accumulator
Benchmark_accum/go-accumulator-8 1000000 426.2 ns/op 64 B/op 1 allocs/op
Benchmark_accum/go-accumulator-8 1000000 617.4 ns/op 116 B/op 1 allocs/op
Benchmark_accum/go-accumulator-8 1000000 611.4 ns/op 120 B/op 2 allocs/op
Benchmark_accum/go-accumulator-8 1000000 310.1 ns/op 36 B/op 0 allocs/op
Benchmark_accum/go-accumulator-8 1000000 195.0 ns/op 16 B/op 0 allocs/op
Benchmark_accum/go-accumulator-8 1000000 183.1 ns/op 1 B/op 0 allocs/op
Benchmark_accum/go-accumulator-8 1000000 673.8 ns/op 136 B/op 2 allocs/op
Benchmark_accum/go-accumulator-8 1000000 569.5 ns/op 107 B/op 1 allocs/op
Benchmark_accum/go-accumulator-8 1000000 269.8 ns/op 30 B/op 0 allocs/op
Benchmark_accum/go-accumulator-8 1000000 715.6 ns/op 105 B/op 1 allocs/op
PASS
```

Expand All @@ -43,29 +43,29 @@ goos: windows
goarch: amd64
Benchmark_accum
Benchmark_accum/go-accumulator,_async
Benchmark_accum/go-accumulator,_async-12 1000000 152.1 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_async-12 1000000 197.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_async-12 1000000 133.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_async-12 1000000 151.5 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_async-12 1000000 177.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async
Benchmark_accum/go-accumulator_async-12 1000000 153.1 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async-12 1000000 132.5 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async-12 1000000 197.6 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async-12 1000000 128.6 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_async-12 1000000 177.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator
Benchmark_accum/lrweck/accumulator-12 1000000 172.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 163.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 163.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 162.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 170.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator,_sync
Benchmark_accum/go-accumulator,_sync-12 1000000 1819 ns/op 192 B/op 3 allocs/op
Benchmark_accum/go-accumulator,_sync-12 1000000 1807 ns/op 190 B/op 3 allocs/op
Benchmark_accum/go-accumulator,_sync-12 1000000 1661 ns/op 184 B/op 3 allocs/op
Benchmark_accum/go-accumulator,_sync-12 1000000 1662 ns/op 185 B/op 3 allocs/op
Benchmark_accum/go-accumulator,_sync-12 1000000 1734 ns/op 186 B/op 3 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 179.4 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 174.0 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 173.9 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 175.1 ns/op 0 B/op 0 allocs/op
Benchmark_accum/lrweck/accumulator-12 1000000 169.2 ns/op 0 B/op 0 allocs/op
Benchmark_accum/go-accumulator_sync
Benchmark_accum/go-accumulator_sync-12 1000000 1760 ns/op 193 B/op 3 allocs/op
Benchmark_accum/go-accumulator_sync-12 1000000 1736 ns/op 187 B/op 3 allocs/op
Benchmark_accum/go-accumulator_sync-12 1000000 1705 ns/op 185 B/op 3 allocs/op
Benchmark_accum/go-accumulator_sync-12 1000000 1766 ns/op 188 B/op 3 allocs/op
Benchmark_accum/go-accumulator_sync-12 1000000 1790 ns/op 186 B/op 3 allocs/op
Benchmark_accum/go-accumulator
Benchmark_accum/go-accumulator-12 1000000 1175 ns/op 112 B/op 1 allocs/op
Benchmark_accum/go-accumulator-12 1000000 718.0 ns/op 63 B/op 1 allocs/op
Benchmark_accum/go-accumulator-12 1000000 313.0 ns/op 19 B/op 0 allocs/op
Benchmark_accum/go-accumulator-12 1000000 914.1 ns/op 82 B/op 1 allocs/op
Benchmark_accum/go-accumulator-12 1000000 1358 ns/op 125 B/op 2 allocs/op
Benchmark_accum/go-accumulator-12 1000000 1522 ns/op 153 B/op 2 allocs/op
Benchmark_accum/go-accumulator-12 1000000 678.2 ns/op 56 B/op 0 allocs/op
Benchmark_accum/go-accumulator-12 1000000 547.7 ns/op 44 B/op 0 allocs/op
Benchmark_accum/go-accumulator-12 1000000 659.2 ns/op 53 B/op 0 allocs/op
Benchmark_accum/go-accumulator-12 1000000 694.2 ns/op 62 B/op 1 allocs/op
PASS
```
4 changes: 2 additions & 2 deletions _benchmark/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Benchmark_accum(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

b.Run("go-accumulator, async", func(b *testing.B) {
b.Run("go-accumulator async", func(b *testing.B) {
summary := 0

b.ResetTimer()
Expand Down Expand Up @@ -80,7 +80,7 @@ func Benchmark_accum(b *testing.B) {
}
})

b.Run("go-accumulator, sync", func(b *testing.B) {
b.Run("go-accumulator sync", func(b *testing.B) {
summary := 0
errGr := errgroup.Group{}

Expand Down
4 changes: 3 additions & 1 deletion _benchmark/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ go 1.19
require (
github.com/lrweck/accumulator v0.0.0-20230204043344-6f6538ed8d35
github.com/nar10z/go-accumulator v1.1.1
golang.org/x/sync v0.6.0
golang.org/x/sync v0.7.0
)

require github.com/stretchr/testify v1.9.0 // indirect

replace github.com/nar10z/go-accumulator => ../
7 changes: 4 additions & 3 deletions _benchmark/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/lrweck/accumulator v0.0.0-20230204043344-6f6538ed8d35 h1:UOy0xlaFCC7AtcL+V75Y966WFPnUzQijYfC4Zkqvcx4=
github.com/lrweck/accumulator v0.0.0-20230204043344-6f6538ed8d35/go.mod h1:aEHlgNG5u4yPx0TqYv6yeuvefUCthLFnu7x+p3alMmg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
53 changes: 25 additions & 28 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,29 @@ func New[T any](
return make([]T, 0, flushSize)
},
},

chStop: make(chan struct{}),
}

a.wgStop.Add(1)
go a.startFlusher()

return a
}

type Accumulator[T any] struct {
flushFunc FlushExec[T]

size int
interval time.Duration

chEvents chan eventExtended[T]
batchEvents sync.Pool
batchOrigEvents sync.Pool

isClose atomic.Bool
wgStop sync.WaitGroup
flushFunc FlushExec[T]
chEvents chan eventExtended[T]
chStop chan struct{}
size int
interval time.Duration
isClose atomic.Bool
isCloseB bool
}

func (a *Accumulator[T]) AddAsync(ctx context.Context, event T) error {
if a.isClose.Load() {
if a.isCloseB {
return ErrSendToClose
}

Expand All @@ -92,7 +91,7 @@ func (a *Accumulator[T]) AddAsync(ctx context.Context, event T) error {
}

func (a *Accumulator[T]) AddSync(ctx context.Context, event T) error {
if a.isClose.Load() {
if a.isCloseB {
return ErrSendToClose
}

Expand Down Expand Up @@ -122,31 +121,29 @@ func (a *Accumulator[T]) Stop() {
return
}

a.isCloseB = true
close(a.chEvents)
a.wgStop.Wait()
}

func (a *Accumulator[T]) newBatch() []eventExtended[T] {
ss, _ := a.batchEvents.Get().([]eventExtended[T])
return ss
<-a.chStop
}

func (a *Accumulator[T]) clearBatch(s []eventExtended[T]) {
a.batchEvents.Put(s[:0])
func (a *Accumulator[T]) IsClosed() bool {
return a.isCloseB
}

func (a *Accumulator[T]) startFlusher() {
defer a.wgStop.Done()
defer func() {
a.chStop <- struct{}{}
}()

ticker := time.NewTicker(a.interval)
defer ticker.Stop()

batch := a.newBatch()
batch, _ := a.batchEvents.Get().([]eventExtended[T])

flush := func() {
a.flush(batch)
a.clearBatch(batch)
batch = a.newBatch()
a.batchEvents.Put(batch[:0])
batch, _ = a.batchEvents.Get().([]eventExtended[T])
}

for {
Expand Down Expand Up @@ -178,17 +175,17 @@ func (a *Accumulator[T]) flush(events []eventExtended[T]) {
}

originalEvents, _ := a.batchOrigEvents.Get().([]T)
for i := range events {
for i := 0; i < len(events); i++ {
originalEvents = append(originalEvents, events[i].e)
}

err := a.flushFunc(originalEvents)
for _, e := range events {
if e.fallback == nil {
for i := 0; i < len(events); i++ {
if events[i].fallback == nil {
continue
}

e.fallback <- err
events[i].fallback <- err
}

a.batchOrigEvents.Put(originalEvents[:0])
Expand Down
22 changes: 17 additions & 5 deletions accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_New(t *testing.T) {
require.NotNil(t, coll)

coll.Stop()
assert.True(t, coll.isClose.Load())
assert.True(t, coll.IsClosed())
})

t.Run("Empty params", func(t *testing.T) {
Expand All @@ -44,11 +44,21 @@ func Test_New(t *testing.T) {
assert.NotEmpty(t, coll.interval)
assert.NotEmpty(t, coll.flushFunc)

coll.Stop()
assert.True(t, coll.isClose.Load())
var (
wg sync.WaitGroup
countStops = 100
)

coll.Stop()
assert.True(t, coll.isClose.Load())
wg.Add(countStops)
for i := 0; i < countStops; i++ {
go func() {
coll.Stop()
assert.True(t, coll.IsClosed())
wg.Done()
}()
}

wg.Wait()
})
}

Expand Down Expand Up @@ -87,10 +97,12 @@ func Test_accumulator(t *testing.T) {
}

wgEvents.Wait()
require.False(t, coll.IsClosed())

coll.Stop()

require.Equal(t, countAsyncEvent*countWriters, summary)
require.True(t, coll.IsClosed())
})
t.Run("#1.2. Only sync", func(t *testing.T) {

Expand Down
4 changes: 2 additions & 2 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package goaccum

type eventExtended[T any] struct {
// return error of flush operation
fallback chan error
// original data
e T
// return error of flush operation
fallback chan error
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/nar10z/go-accumulator
go 1.19

require (
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.6.0
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.7.0
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down

0 comments on commit 3d6b9a4

Please sign in to comment.