Skip to content

Commit

Permalink
feat: collection offset implementation (#10545)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Feb 15, 2022
1 parent f75f437 commit 5479df2
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 80 deletions.
10 changes: 8 additions & 2 deletions agent/agent.go
Expand Up @@ -298,11 +298,17 @@ func (a *Agent) runInputs(
jitter = input.Config.CollectionJitter
}

// Overwrite agent collection_offset if this plugin has its own.
offset := time.Duration(a.Config.Agent.CollectionOffset)
if input.Config.CollectionOffset != 0 {
offset = input.Config.CollectionOffset
}

var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter)
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
} else {
ticker = NewUnalignedTicker(interval, jitter)
ticker = NewUnalignedTicker(interval, jitter, offset)
}
defer ticker.Stop()

Expand Down
81 changes: 46 additions & 35 deletions agent/tick.go
Expand Up @@ -31,36 +31,38 @@ type Ticker interface {
type AlignedTicker struct {
interval time.Duration
jitter time.Duration
offset time.Duration
minInterval time.Duration
ch chan time.Time
cancel context.CancelFunc
wg sync.WaitGroup
}

func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker {
return newAlignedTicker(now, interval, jitter, clock.New())
}

func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker {
ctx, cancel := context.WithCancel(context.Background())
func NewAlignedTicker(now time.Time, interval, jitter, offset time.Duration) *AlignedTicker {
t := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
ch: make(chan time.Time, 1),
cancel: cancel,
}
t.start(now, clock.New())
return t
}

func (t *AlignedTicker) start(now time.Time, clk clock.Clock) {
t.ch = make(chan time.Time, 1)

ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel

d := t.next(now)
timer := clock.Timer(d)
timer := clk.Timer(d)

t.wg.Add(1)
go func() {
defer t.wg.Done()
t.run(ctx, timer)
}()

return t
}

func (t *AlignedTicker) next(now time.Time) time.Duration {
Expand All @@ -74,6 +76,7 @@ func (t *AlignedTicker) next(now time.Time) time.Duration {
if d == 0 {
d = t.interval
}
d += t.offset
d += internal.RandomDuration(t.jitter)
return d
}
Expand Down Expand Up @@ -118,42 +121,48 @@ func (t *AlignedTicker) Stop() {
type UnalignedTicker struct {
interval time.Duration
jitter time.Duration
offset time.Duration
ch chan time.Time
cancel context.CancelFunc
wg sync.WaitGroup
}

func NewUnalignedTicker(interval, jitter time.Duration) *UnalignedTicker {
return newUnalignedTicker(interval, jitter, clock.New())
}

func newUnalignedTicker(interval, jitter time.Duration, clock clock.Clock) *UnalignedTicker {
ctx, cancel := context.WithCancel(context.Background())
func NewUnalignedTicker(interval, jitter, offset time.Duration) *UnalignedTicker {
t := &UnalignedTicker{
interval: interval,
jitter: jitter,
ch: make(chan time.Time, 1),
cancel: cancel,
offset: offset,
}
t.start(clock.New())
return t
}

ticker := clock.Ticker(t.interval)
t.ch <- clock.Now()
func (t *UnalignedTicker) start(clk clock.Clock) *UnalignedTicker {
t.ch = make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel

ticker := clk.Ticker(t.interval)
if t.offset == 0 {
// Perform initial trigger to stay backward compatible
t.ch <- clk.Now()
}

t.wg.Add(1)
go func() {
defer t.wg.Done()
t.run(ctx, ticker, clock)
t.run(ctx, ticker, clk)
}()

return t
}

func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error {
func sleep(ctx context.Context, duration time.Duration, clk clock.Clock) error {
if duration == 0 {
return nil
}

t := clock.Timer(duration)
t := clk.Timer(duration)
select {
case <-t.C:
return nil
Expand All @@ -163,21 +172,21 @@ func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error
}
}

func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clock clock.Clock) {
func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clk clock.Clock) {
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
jitter := internal.RandomDuration(t.jitter)
err := sleep(ctx, jitter, clock)
err := sleep(ctx, t.offset+jitter, clk)
if err != nil {
ticker.Stop()
return
}
select {
case t.ch <- clock.Now():
case t.ch <- clk.Now():
default:
}
}
Expand Down Expand Up @@ -217,20 +226,22 @@ type RollingTicker struct {
}

func NewRollingTicker(interval, jitter time.Duration) *RollingTicker {
return newRollingTicker(interval, jitter, clock.New())
}

func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *RollingTicker {
ctx, cancel := context.WithCancel(context.Background())
t := &RollingTicker{
interval: interval,
jitter: jitter,
ch: make(chan time.Time, 1),
cancel: cancel,
}
t.start(clock.New())
return t
}

func (t *RollingTicker) start(clk clock.Clock) *RollingTicker {
t.ch = make(chan time.Time, 1)

ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel

d := t.next()
timer := clock.Timer(d)
timer := clk.Timer(d)

t.wg.Add(1)
go func() {
Expand Down

0 comments on commit 5479df2

Please sign in to comment.