Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Collection offset implementation #10545

Merged
merged 10 commits into from Feb 15, 2022
10 changes: 8 additions & 2 deletions agent/agent.go
Expand Up @@ -298,11 +298,17 @@ func (a *Agent) runInputs(
jitter = input.Config.CollectionJitter
}

// Overwrite agent collection_offset if this plugin has its own.
offset := time.Duration(a.Config.Agent.CollectionOffset)
if input.Config.CollectionOffset != 0 {
offset = input.Config.CollectionOffset
}

var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter)
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
} else {
ticker = NewUnalignedTicker(interval, jitter)
ticker = NewUnalignedTicker(interval, jitter, offset)
}
defer ticker.Stop()

Expand Down
81 changes: 46 additions & 35 deletions agent/tick.go
Expand Up @@ -31,36 +31,38 @@ type Ticker interface {
type AlignedTicker struct {
interval time.Duration
jitter time.Duration
offset time.Duration
minInterval time.Duration
ch chan time.Time
cancel context.CancelFunc
wg sync.WaitGroup
}

func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker {
return newAlignedTicker(now, interval, jitter, clock.New())
}

func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker {
ctx, cancel := context.WithCancel(context.Background())
func NewAlignedTicker(now time.Time, interval, jitter, offset time.Duration) *AlignedTicker {
t := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
ch: make(chan time.Time, 1),
cancel: cancel,
}
t.start(now, clock.New())
return t
}

func (t *AlignedTicker) start(now time.Time, clk clock.Clock) {
t.ch = make(chan time.Time, 1)

ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel

d := t.next(now)
timer := clock.Timer(d)
timer := clk.Timer(d)

t.wg.Add(1)
go func() {
defer t.wg.Done()
t.run(ctx, timer)
}()

return t
}

func (t *AlignedTicker) next(now time.Time) time.Duration {
Expand All @@ -75,6 +77,7 @@ func (t *AlignedTicker) next(now time.Time) time.Duration {
d = t.interval
}
d += internal.RandomDuration(t.jitter)
d += t.offset
srebhan marked this conversation as resolved.
Show resolved Hide resolved
return d
}

Expand Down Expand Up @@ -118,42 +121,48 @@ func (t *AlignedTicker) Stop() {
type UnalignedTicker struct {
interval time.Duration
jitter time.Duration
offset time.Duration
ch chan time.Time
cancel context.CancelFunc
wg sync.WaitGroup
}

func NewUnalignedTicker(interval, jitter time.Duration) *UnalignedTicker {
return newUnalignedTicker(interval, jitter, clock.New())
}

func newUnalignedTicker(interval, jitter time.Duration, clock clock.Clock) *UnalignedTicker {
ctx, cancel := context.WithCancel(context.Background())
func NewUnalignedTicker(interval, jitter, offset time.Duration) *UnalignedTicker {
t := &UnalignedTicker{
interval: interval,
jitter: jitter,
ch: make(chan time.Time, 1),
cancel: cancel,
offset: offset,
}
t.start(clock.New())
return t
}

ticker := clock.Ticker(t.interval)
t.ch <- clock.Now()
func (t *UnalignedTicker) start(clk clock.Clock) *UnalignedTicker {
t.ch = make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel

ticker := clk.Ticker(t.interval)
if t.offset == 0 {
// Perform initial trigger to stay backward compatible
t.ch <- clk.Now()
}

t.wg.Add(1)
go func() {
defer t.wg.Done()
t.run(ctx, ticker, clock)
t.run(ctx, ticker, clk)
}()

return t
}

func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error {
func sleep(ctx context.Context, duration time.Duration, clk clock.Clock) error {
if duration == 0 {
return nil
}

t := clock.Timer(duration)
t := clk.Timer(duration)
select {
case <-t.C:
return nil
Expand All @@ -163,21 +172,21 @@ func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error
}
}

func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clock clock.Clock) {
func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clk clock.Clock) {
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
jitter := internal.RandomDuration(t.jitter)
err := sleep(ctx, jitter, clock)
err := sleep(ctx, jitter+t.offset, clk)
if err != nil {
ticker.Stop()
return
}
select {
case t.ch <- clock.Now():
case t.ch <- clk.Now():
default:
}
}
Expand Down Expand Up @@ -217,20 +226,22 @@ type RollingTicker struct {
}

func NewRollingTicker(interval, jitter time.Duration) *RollingTicker {
return newRollingTicker(interval, jitter, clock.New())
}

func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *RollingTicker {
ctx, cancel := context.WithCancel(context.Background())
t := &RollingTicker{
interval: interval,
jitter: jitter,
ch: make(chan time.Time, 1),
cancel: cancel,
}
t.start(clock.New())
return t
}

func (t *RollingTicker) start(clk clock.Clock) *RollingTicker {
t.ch = make(chan time.Time, 1)

ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel

d := t.next()
timer := clock.Timer(d)
timer := clk.Timer(d)

t.wg.Add(1)
go func() {
Expand Down