diff --git a/README.md b/README.md index 1286d79..60d6657 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ go get github.com/un000/tailor ## TODO - [x] Better Test Code Coverage - [ ] Benchmarks -- [ ] Rate limiter + Leaky Bucket +- [x] Rate limiter + Leaky Bucket ## Example ``` diff --git a/options.go b/options.go index 818899e..a9312f1 100644 --- a/options.go +++ b/options.go @@ -21,6 +21,9 @@ type options struct { bufioReaderPoolSize int pollerTimeout time.Duration updateLagInterval time.Duration + + rateLimiter RateLimiter + leakyBucket bool } // withDefaultOptions sets the initial options. @@ -75,3 +78,17 @@ func WithUpdateLagInterval(duration time.Duration) Option { options.updateLagInterval = duration } } + +// WithRateLimiter is used to rate limit output lines. Watch RateLimiter interface. +func WithRateLimiter(rl RateLimiter) Option { + return func(options *options) { + options.rateLimiter = rl + } +} + +// WithLeakyBucket is used to skip a read lines, when a listener is full. +func WithLeakyBucket() Option { + return func(options *options) { + options.leakyBucket = true + } +} diff --git a/rate_limiter.go b/rate_limiter.go new file mode 100644 index 0000000..588208f --- /dev/null +++ b/rate_limiter.go @@ -0,0 +1,14 @@ +// Copyright 2019 Yegor Myskin. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tailor + +// RateLimiter provides methods to create a custom rate limiter. +type RateLimiter interface { + // Allow says that a line should be sent to a receiver of a lines. + Allow() bool + + // Close finalizes rate limiter. + Close() +} diff --git a/rate_limiter_channel.go b/rate_limiter_channel.go new file mode 100644 index 0000000..e2dfb5f --- /dev/null +++ b/rate_limiter_channel.go @@ -0,0 +1,26 @@ +package tailor + +import ( + "time" +) + +type ChannelBasedRateLimiter struct { + t *time.Ticker +} + +// NewChannelBasedRateLimiter creates an instance of rate limiter, which ticker ticks every period to limit the lps. +func NewChannelBasedRateLimiter(lps int) *ChannelBasedRateLimiter { + return &ChannelBasedRateLimiter{ + t: time.NewTicker(time.Second / time.Duration(lps)), + } +} + +// Allow will block until the ticker ticks. +func (rl *ChannelBasedRateLimiter) Allow() bool { + _, ok := <-rl.t.C + return ok +} + +func (rl *ChannelBasedRateLimiter) Close() { + rl.t.Stop() +} diff --git a/rate_limiter_channel_test.go b/rate_limiter_channel_test.go new file mode 100644 index 0000000..f62586c --- /dev/null +++ b/rate_limiter_channel_test.go @@ -0,0 +1,27 @@ +// Copyright 2019 Yegor Myskin. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tailor + +import ( + "testing" + "time" +) + +func TestChannelBasedRateLimiterDisallow(t *testing.T) { + l := NewChannelBasedRateLimiter(30) + defer l.Close() + + start := time.Now() + for i := 0; i < 100; i++ { + if !l.Allow() { + t.FailNow() + } + } + dur := time.Since(start) + + if dur < 3333*time.Millisecond || dur > 3500*time.Millisecond { + t.Errorf("expected duration: ~3.33s, actual: %s", dur) + } +} diff --git a/seeker_test.go b/seeker_test.go index aeb31b4..27a7048 100644 --- a/seeker_test.go +++ b/seeker_test.go @@ -1,3 +1,7 @@ +// Copyright 2019 Yegor Myskin. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package tailor import ( diff --git a/tailor.go b/tailor.go index ee1db5c..06b72f0 100644 --- a/tailor.go +++ b/tailor.go @@ -21,6 +21,7 @@ type Tailor struct { opts options + // stats lastPos int64 lastSize int64 lag int64 @@ -90,6 +91,11 @@ func (t *Tailor) readLoop(ctx context.Context) { } close(t.lines) close(t.errs) + + if t.opts.rateLimiter != nil { + t.opts.rateLimiter.Close() + } + atomic.StoreInt32(&t.working, 0) }() @@ -183,9 +189,21 @@ func (t *Tailor) readLoop(ctx context.Context) { pollerTimeout = t.opts.pollerTimeout - t.lines <- Line{ - line: line, - fileName: t.fileName, + if t.opts.rateLimiter == nil || t.opts.rateLimiter.Allow() { + line := Line{ + line: line, + fileName: t.fileName, + } + + if t.opts.leakyBucket { + select { + case t.lines <- line: + default: + } + continue + } + + t.lines <- line } } }()