From 96fca6731a409721a6e9026bf73b14ab142c389b Mon Sep 17 00:00:00 2001 From: Yegor Myskin Date: Mon, 3 Jun 2019 22:46:38 +0300 Subject: [PATCH 1/4] Rate limiter and leaky bucket --- README.md | 2 +- options.go | 17 +++++++++++++++++ rate_limiter.go | 14 ++++++++++++++ rate_limiter_channel.go | 26 ++++++++++++++++++++++++++ tailor.go | 24 +++++++++++++++++++++--- 5 files changed, 79 insertions(+), 4 deletions(-) create mode 100644 rate_limiter.go create mode 100644 rate_limiter_channel.go diff --git a/README.md b/README.md index 687dd85..b7a8325 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ go get github.com/un000/tailor ## TODO - [x] Better Test Code Coverage - [ ] Benchmarks -- [ ] Rate limiter + Leaky Bucket +- [x] Rate limiter + Leaky Bucket ## Example ``` diff --git a/options.go b/options.go index 818899e..933677a 100644 --- a/options.go +++ b/options.go @@ -21,6 +21,9 @@ type options struct { bufioReaderPoolSize int pollerTimeout time.Duration updateLagInterval time.Duration + + rateLimiter RateLimiter + leakyBucket bool } // withDefaultOptions sets the initial options. @@ -75,3 +78,17 @@ func WithUpdateLagInterval(duration time.Duration) Option { options.updateLagInterval = duration } } + +// WithRateLimiter is used to rate limit output lines. Watch RateLimiter interface. +func WithRateLimiter(rl RateLimiter) Option { + return func(options *options) { + options.rateLimiter = rl + } +} + +// WithLeakyBucket is used skip read lines, when listener is full. +func WithLeakyBucket() Option { + return func(options *options) { + options.leakyBucket = true + } +} diff --git a/rate_limiter.go b/rate_limiter.go new file mode 100644 index 0000000..588208f --- /dev/null +++ b/rate_limiter.go @@ -0,0 +1,14 @@ +// Copyright 2019 Yegor Myskin. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tailor + +// RateLimiter provides methods to create a custom rate limiter. +type RateLimiter interface { + // Allow says that a line should be sent to a receiver of a lines. + Allow() bool + + // Close finalizes rate limiter. + Close() +} diff --git a/rate_limiter_channel.go b/rate_limiter_channel.go new file mode 100644 index 0000000..260d429 --- /dev/null +++ b/rate_limiter_channel.go @@ -0,0 +1,26 @@ +package tailor + +import ( + "time" +) + +type ChannelBasedRateLimiter struct { + t *time.Ticker +} + +// NewChannelBasedRateLimiter creates an instance of rate limiter, which ticker ticks every period to limit the lps. +func NewChannelBasedRateLimiter(lps int) *ChannelBasedRateLimiter { + return &ChannelBasedRateLimiter{ + t: time.NewTicker(1 / (time.Duration(lps) * time.Second)), + } +} + +// Allow will block until the ticker ticks. +func (rl *ChannelBasedRateLimiter) Allow() bool { + _, ok := <-rl.t.C + return ok +} + +func (rl *ChannelBasedRateLimiter) Close() { + rl.t.Stop() +} diff --git a/tailor.go b/tailor.go index ee1db5c..06b72f0 100644 --- a/tailor.go +++ b/tailor.go @@ -21,6 +21,7 @@ type Tailor struct { opts options + // stats lastPos int64 lastSize int64 lag int64 @@ -90,6 +91,11 @@ func (t *Tailor) readLoop(ctx context.Context) { } close(t.lines) close(t.errs) + + if t.opts.rateLimiter != nil { + t.opts.rateLimiter.Close() + } + atomic.StoreInt32(&t.working, 0) }() @@ -183,9 +189,21 @@ func (t *Tailor) readLoop(ctx context.Context) { pollerTimeout = t.opts.pollerTimeout - t.lines <- Line{ - line: line, - fileName: t.fileName, + if t.opts.rateLimiter == nil || t.opts.rateLimiter.Allow() { + line := Line{ + line: line, + fileName: t.fileName, + } + + if t.opts.leakyBucket { + select { + case t.lines <- line: + default: + } + continue + } + + t.lines <- line } } }() From 6c9edf835eb339f30a52bb8d89d574a4bc9095c3 Mon Sep 17 00:00:00 2001 From: Yegor Myskin Date: Tue, 4 Jun 2019 00:32:55 +0300 Subject: [PATCH 2/4] fix doc --- options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/options.go b/options.go index 933677a..a9312f1 100644 --- a/options.go +++ b/options.go @@ -86,7 +86,7 @@ func WithRateLimiter(rl RateLimiter) Option { } } -// WithLeakyBucket is used skip read lines, when listener is full. +// WithLeakyBucket is used to skip a read lines, when a listener is full. func WithLeakyBucket() Option { return func(options *options) { options.leakyBucket = true From f1dd5aeb6d2185ad2708ef323119e6e49602592c Mon Sep 17 00:00:00 2001 From: Yegor Myskin Date: Tue, 4 Jun 2019 01:37:43 +0300 Subject: [PATCH 3/4] Fix formula --- rate_limiter_channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rate_limiter_channel.go b/rate_limiter_channel.go index 260d429..e2dfb5f 100644 --- a/rate_limiter_channel.go +++ b/rate_limiter_channel.go @@ -11,7 +11,7 @@ type ChannelBasedRateLimiter struct { // NewChannelBasedRateLimiter creates an instance of rate limiter, which ticker ticks every period to limit the lps. func NewChannelBasedRateLimiter(lps int) *ChannelBasedRateLimiter { return &ChannelBasedRateLimiter{ - t: time.NewTicker(1 / (time.Duration(lps) * time.Second)), + t: time.NewTicker(time.Second / time.Duration(lps)), } } From 236a4d20c7cc4871011e8972961c9c2d13a3b1ca Mon Sep 17 00:00:00 2001 From: Yegor Myskin Date: Tue, 4 Jun 2019 01:48:24 +0300 Subject: [PATCH 4/4] Test --- rate_limiter_channel_test.go | 27 +++++++++++++++++++++++++++ seeker_test.go | 4 ++++ 2 files changed, 31 insertions(+) create mode 100644 rate_limiter_channel_test.go diff --git a/rate_limiter_channel_test.go b/rate_limiter_channel_test.go new file mode 100644 index 0000000..f62586c --- /dev/null +++ b/rate_limiter_channel_test.go @@ -0,0 +1,27 @@ +// Copyright 2019 Yegor Myskin. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tailor + +import ( + "testing" + "time" +) + +func TestChannelBasedRateLimiterDisallow(t *testing.T) { + l := NewChannelBasedRateLimiter(30) + defer l.Close() + + start := time.Now() + for i := 0; i < 100; i++ { + if !l.Allow() { + t.FailNow() + } + } + dur := time.Since(start) + + if dur < 3333*time.Millisecond || dur > 3500*time.Millisecond { + t.Errorf("expected duration: ~3.33s, actual: %s", dur) + } +} diff --git a/seeker_test.go b/seeker_test.go index aeb31b4..27a7048 100644 --- a/seeker_test.go +++ b/seeker_test.go @@ -1,3 +1,7 @@ +// Copyright 2019 Yegor Myskin. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package tailor import (