From ec2fff3524b6951c7c2c9a94098ef374520dbba1 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 8 Jun 2017 11:24:03 +0800 Subject: [PATCH] apiserver: rate-limit logsink receives Update the logsink handler so that it will rate-limit receipt of log messages. Rate-limiting defaults to a burst of 1000 messages, after which we allow 1 message per millisecond. This is configurable by modifying the controller agent's agent.conf. If we find that users need to configure it regularly, we can promote the configuration to controller config. --- agent/agent.go | 2 + apiserver/apiserver.go | 103 +++++++++++++++++++----------- apiserver/logsink/logsink.go | 58 ++++++++++++++++- apiserver/logsink/logsink_test.go | 65 +++++++++++++++++++ apiserver/logsink_test.go | 8 +++ cmd/jujud/agent/machine.go | 54 +++++++++++----- dependencies.tsv | 1 + 7 files changed, 235 insertions(+), 56 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 9f9356d62d34..ea3c289bbfa4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -183,6 +183,8 @@ const ( LogSinkDBLoggerBufferSize = "LOGSINK_DBLOGGER_BUFFER_SIZE" LogSinkDBLoggerFlushInterval = "LOGSINK_DBLOGGER_FLUSH_INTERVAL" + LogSinkRateLimitBurst = "LOGSINK_RATELIMIT_BURST" + LogSinkRateLimitRefill = "LOGSINK_RATELIMIT_REFILL" ) // The Config interface is the sole way that the agent gets access to the diff --git a/apiserver/apiserver.go b/apiserver/apiserver.go index 5d2625d7dc20..194b88ba193d 100644 --- a/apiserver/apiserver.go +++ b/apiserver/apiserver.go @@ -51,46 +51,49 @@ var defaultHTTPMethods = []string{"GET", "POST", "HEAD", "PUT", "DELETE", "OPTIO // These vars define how we rate limit incoming connections. const ( - defaultLoginRateLimit = 10 // concurrent login operations - defaultLoginMinPause = 100 * time.Millisecond - defaultLoginMaxPause = 1 * time.Second - defaultLoginRetryPause = 5 * time.Second - defaultConnMinPause = 0 * time.Millisecond - defaultConnMaxPause = 5 * time.Second - defaultConnLookbackWindow = 1 * time.Second - defaultConnLowerThreshold = 1000 // connections per second - defaultConnUpperThreshold = 100000 // connections per second + defaultLoginRateLimit = 10 // concurrent login operations + defaultLoginMinPause = 100 * time.Millisecond + defaultLoginMaxPause = 1 * time.Second + defaultLoginRetryPause = 5 * time.Second + defaultConnMinPause = 0 * time.Millisecond + defaultConnMaxPause = 5 * time.Second + defaultConnLookbackWindow = 1 * time.Second + defaultConnLowerThreshold = 1000 // connections per second + defaultConnUpperThreshold = 100000 // connections per second + defaultLogSinkRateLimitBurst = 1000 + defaultLogSinkRateLimitRefill = time.Millisecond ) // Server holds the server side of the API. type Server struct { - tomb tomb.Tomb - clock clock.Clock - pingClock clock.Clock - wg sync.WaitGroup - state *state.State - statePool *state.StatePool - lis net.Listener - tag names.Tag - dataDir string - logDir string - limiter utils.Limiter - loginRetryPause time.Duration - validator LoginValidator - facades *facade.Registry - modelUUID string - authCtxt *authContext - lastConnectionID uint64 - centralHub *pubsub.StructuredHub - newObserver observer.ObserverFactory - connCount int64 - totalConn int64 - loginAttempts int64 - certChanged <-chan params.StateServingInfo - tlsConfig *tls.Config - allowModelAccess bool - logSinkWriter io.WriteCloser - dbloggers dbloggers + tomb tomb.Tomb + clock clock.Clock + pingClock clock.Clock + wg sync.WaitGroup + state *state.State + statePool *state.StatePool + lis net.Listener + tag names.Tag + dataDir string + logDir string + limiter utils.Limiter + loginRetryPause time.Duration + validator LoginValidator + facades *facade.Registry + modelUUID string + authCtxt *authContext + lastConnectionID uint64 + centralHub *pubsub.StructuredHub + newObserver observer.ObserverFactory + connCount int64 + totalConn int64 + loginAttempts int64 + certChanged <-chan params.StateServingInfo + tlsConfig *tls.Config + allowModelAccess bool + logSinkWriter io.WriteCloser + logsinkRateLimitConfig logsink.RateLimitConfig + dbloggers dbloggers // mu guards the fields below it. mu sync.Mutex @@ -175,7 +178,8 @@ type ServerConfig struct { PrometheusRegisterer prometheus.Registerer } -func (c *ServerConfig) Validate() error { +// Validate validates the API server configuration. +func (c ServerConfig) Validate() error { if c.Hub == nil { return errors.NotValidf("missing Hub") } @@ -199,7 +203,7 @@ func (c *ServerConfig) Validate() error { return nil } -func (c *ServerConfig) pingClock() clock.Clock { +func (c ServerConfig) pingClock() clock.Clock { if c.PingClock == nil { return c.Clock } @@ -273,6 +277,14 @@ type LogSinkConfig struct { // DBLoggerFlushInterval is the amount of time to allow a log record // to sit in the buffer before being flushed to the database. DBLoggerFlushInterval time.Duration + + // RateLimitBurst defines the number of log messages that will be let + // through before we start rate limiting. + RateLimitBurst int64 + + // RateLimitRefill defines the rate at which log messages will be let + // through once the initial burst amount has been depleted. + RateLimitRefill time.Duration } // Validate validates the logsink endpoint configuration. @@ -283,6 +295,12 @@ func (cfg LogSinkConfig) Validate() error { if cfg.DBLoggerFlushInterval <= 0 || cfg.DBLoggerFlushInterval > 10*time.Second { return errors.NotValidf("DBLoggerFlushInterval %s <= 0 or > 10 seconds", cfg.DBLoggerFlushInterval) } + if cfg.RateLimitBurst <= 0 { + return errors.NotValidf("RateLimitBurst %d <= 0", cfg.RateLimitBurst) + } + if cfg.RateLimitRefill <= 0 { + return errors.NotValidf("RateLimitRefill %s <= 0", cfg.RateLimitRefill) + } return nil } @@ -291,6 +309,8 @@ func DefaultLogSinkConfig() LogSinkConfig { return LogSinkConfig{ DBLoggerBufferSize: defaultDBLoggerBufferSize, DBLoggerFlushInterval: defaultDBLoggerFlushInterval, + RateLimitBurst: defaultLogSinkRateLimitBurst, + RateLimitRefill: defaultLogSinkRateLimitRefill, } } @@ -350,6 +370,11 @@ func newServer(s *state.State, lis net.Listener, cfg ServerConfig) (_ *Server, e allowModelAccess: cfg.AllowModelAccess, publicDNSName_: cfg.AutocertDNSName, registerIntrospectionHandlers: cfg.RegisterIntrospectionHandlers, + logsinkRateLimitConfig: logsink.RateLimitConfig{ + Refill: cfg.LogSinkConfig.RateLimitRefill, + Burst: cfg.LogSinkConfig.RateLimitBurst, + Clock: cfg.Clock, + }, dbloggers: dbloggers{ clock: cfg.Clock, dbLoggerBufferSize: cfg.LogSinkConfig.DBLoggerBufferSize, @@ -611,6 +636,7 @@ func (srv *Server) endpoints() []apihttp.Endpoint { logSinkHandler := logsink.NewHTTPHandler( newAgentLogWriteCloserFunc(httpCtxt, srv.logSinkWriter, &srv.dbloggers), httpCtxt.stop(), + &srv.logsinkRateLimitConfig, ) add("/model/:modeluuid/logsink", srv.trackRequests(logSinkHandler)) @@ -618,6 +644,7 @@ func (srv *Server) endpoints() []apihttp.Endpoint { logTransferHandler := logsink.NewHTTPHandler( newMigrationLogWriteCloserFunc(httpCtxt, &srv.dbloggers), httpCtxt.stop(), + nil, // no rate-limiting ) add("/migrate/logtransfer", srv.trackRequests(logTransferHandler)) diff --git a/apiserver/logsink/logsink.go b/apiserver/logsink/logsink.go index 3e97d247d3cb..43c7ed54f102 100644 --- a/apiserver/logsink/logsink.go +++ b/apiserver/logsink/logsink.go @@ -11,6 +11,8 @@ import ( gorillaws "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/juju/loggo" + "github.com/juju/ratelimit" + "github.com/juju/utils/clock" "github.com/juju/utils/featureflag" "github.com/juju/version" @@ -34,21 +36,43 @@ type LogWriteCloser interface { // NewLogWriteCloserFunc returns a new LogWriteCloser for the given http.Request. type NewLogWriteCloserFunc func(*http.Request) (LogWriteCloser, error) +// RateLimitConfig contains the rate-limit configuration for the logsink +// handler. +type RateLimitConfig struct { + // Burst is the number of log messages that will be let through before + // we start rate limiting. + Burst int64 + + // Refill is the rate at which log messages will be let through once + // the initial burst amount has been depleted. + Refill time.Duration + + // Clock is the clock used to wait when rate-limiting log receives. + Clock clock.Clock +} + // NewHTTPHandler returns a new http.Handler for receiving log messages over a -// websocket. +// websocket, using the given NewLogWriteCloserFunc to obtain a writer to which +// the log messages will be written. +// +// ratelimit defines an optional rate-limit configuration. If nil, no rate- +// limiting will be applied. func NewHTTPHandler( newLogWriteCloser NewLogWriteCloserFunc, abort <-chan struct{}, + ratelimit *RateLimitConfig, ) http.Handler { return &logSinkHandler{ newLogWriteCloser: newLogWriteCloser, abort: abort, + ratelimit: ratelimit, } } type logSinkHandler struct { newLogWriteCloser NewLogWriteCloserFunc abort <-chan struct{} + ratelimit *RateLimitConfig } // Since the logsink only receives messages, it is possible for the other end @@ -157,6 +181,15 @@ func (h *logSinkHandler) getVersion(req *http.Request) (int, error) { func (h *logSinkHandler) receiveLogs(socket *websocket.Conn, endpointVersion int) <-chan params.LogRecord { logCh := make(chan params.LogRecord) + var tokenBucket *ratelimit.Bucket + if h.ratelimit != nil { + tokenBucket = ratelimit.NewBucketWithClock( + h.ratelimit.Refill, + h.ratelimit.Burst, + ratelimitClock{h.ratelimit.Clock}, + ) + } + go func() { // Close the channel to signal ServeHTTP to finish. Otherwise // we leak goroutines on client disconnect, because the server @@ -180,6 +213,19 @@ func (h *logSinkHandler) receiveLogs(socket *websocket.Conn, endpointVersion int return } + // Rate-limit receipt of log messages. We rate-limit + // each connection individually to prevent one noisy + // individual from drowning out the others. + if tokenBucket != nil { + if d := tokenBucket.Take(1); d > 0 { + select { + case <-h.ratelimit.Clock.After(d): + case <-h.abort: + return + } + } + } + // Send the log message. select { case <-h.abort: @@ -223,3 +269,13 @@ func JujuClientVersionFromRequest(req *http.Request) (version.Number, error) { } return ver, nil } + +// ratelimitClock adapts clock.Clock to ratelimit.Clock. +type ratelimitClock struct { + clock.Clock +} + +// Sleep is defined by the ratelimit.Clock interface. +func (c ratelimitClock) Sleep(d time.Duration) { + <-c.Clock.After(d) +} diff --git a/apiserver/logsink/logsink_test.go b/apiserver/logsink/logsink_test.go index 9ff776e52f42..97462c1df609 100644 --- a/apiserver/logsink/logsink_test.go +++ b/apiserver/logsink/logsink_test.go @@ -65,6 +65,7 @@ func (s *logsinkSuite) SetUpTest(c *gc.C) { }, s.stub.NextErr() }, s.abort, + nil, // no rate-limiting )) s.AddCleanup(func(*gc.C) { s.srv.Close() }) } @@ -168,6 +169,70 @@ func (s *logsinkSuite) TestReceiveErrorBreaksConn(c *gc.C) { websockettest.AssertWebsocketClosed(c, conn) } +func (s *logsinkSuite) TestRateLimit(c *gc.C) { + testClock := testing.NewClock(time.Time{}) + s.srv.Close() + s.srv = httptest.NewServer(logsink.NewHTTPHandler( + func(req *http.Request) (logsink.LogWriteCloser, error) { + s.stub.AddCall("Open") + return &mockLogWriteCloser{ + &s.stub, + s.written, + }, s.stub.NextErr() + }, + s.abort, + &logsink.RateLimitConfig{ + Burst: 2, + Refill: time.Second, + Clock: testClock, + }, + )) + + conn := s.dialWebsocket(c) + websockettest.AssertJSONInitialErrorNil(c, conn) + + record := params.LogRecord{ + Time: time.Date(2015, time.June, 1, 23, 2, 1, 0, time.UTC), + Module: "some.where", + Location: "foo.go:42", + Level: loggo.INFO.String(), + Message: "all is well", + } + for i := 0; i < 4; i++ { + err := conn.WriteJSON(&record) + c.Assert(err, jc.ErrorIsNil) + } + + expectRecord := func() { + select { + case written, ok := <-s.written: + c.Assert(ok, jc.IsTrue) + c.Assert(written, jc.DeepEquals, record) + case <-time.After(coretesting.LongWait): + c.Fatal("timed out waiting for log record to be written") + } + } + expectNoRecord := func() { + select { + case <-s.written: + c.Fatal("unexpected log record") + case <-time.After(coretesting.ShortWait): + } + } + + // There should be 2 records received immediately, + // and then rate-limiting should kick in. + expectRecord() + expectRecord() + expectNoRecord() + testClock.WaitAdvance(time.Second, coretesting.LongWait, 1) + expectRecord() + expectNoRecord() + testClock.WaitAdvance(time.Second, coretesting.LongWait, 1) + expectRecord() + expectNoRecord() +} + type mockLogWriteCloser struct { *testing.Stub written chan<- params.LogRecord diff --git a/apiserver/logsink_test.go b/apiserver/logsink_test.go index a20d0c6735bc..0039ea9b1259 100644 --- a/apiserver/logsink_test.go +++ b/apiserver/logsink_test.go @@ -230,6 +230,14 @@ func (s *logsinkSuite) TestNewServerValidatesLogSinkConfig(c *gc.C) { cfg.LogSinkConfig.DBLoggerFlushInterval = 30 * time.Second _, err = apiserver.NewServer(s.State, dummyListener{}, cfg) c.Assert(err, gc.ErrorMatches, "validating logsink configuration: DBLoggerFlushInterval 30s <= 0 or > 10 seconds not valid") + + cfg.LogSinkConfig.DBLoggerFlushInterval = 10 * time.Second + _, err = apiserver.NewServer(s.State, dummyListener{}, cfg) + c.Assert(err, gc.ErrorMatches, "validating logsink configuration: RateLimitBurst 0 <= 0 not valid") + + cfg.LogSinkConfig.RateLimitBurst = 1000 + _, err = apiserver.NewServer(s.State, dummyListener{}, cfg) + c.Assert(err, gc.ErrorMatches, "validating logsink configuration: RateLimitRefill 0s <= 0 not valid") } func (s *logsinkSuite) dialWebsocket(c *gc.C) *websocket.Conn { diff --git a/cmd/jujud/agent/machine.go b/cmd/jujud/agent/machine.go index 6b44d3f69a52..67edb4b101d1 100644 --- a/cmd/jujud/agent/machine.go +++ b/cmd/jujud/agent/machine.go @@ -1394,23 +1394,6 @@ func getRateLimitConfig(cfg agent.Config) (apiserver.RateLimitConfig, error) { } func getLogSinkConfig(cfg agent.Config) (apiserver.LogSinkConfig, error) { - result := apiserver.DefaultLogSinkConfig() - var err error - if v := cfg.Value(agent.LogSinkDBLoggerBufferSize); v != "" { - result.DBLoggerBufferSize, err = strconv.Atoi(v) - if err != nil { - return result, errors.Annotatef( - err, "parsing %s", agent.LogSinkDBLoggerBufferSize, - ) - } - } - if v := cfg.Value(agent.LogSinkDBLoggerFlushInterval); v != "" { - if result.DBLoggerFlushInterval, err = time.ParseDuration(v); err != nil { - return result, errors.Annotatef( - err, "parsing %s", agent.LogSinkDBLoggerFlushInterval, - ) - } - } return result, nil } @@ -1869,3 +1852,40 @@ func newStateMetricsWorker(st *state.State, registry *prometheus.Registry) worke return nil }) } + +func getLogSinkConfig(cfg agent.Config) (apiserver.LogSinkConfig, error) { + result := apiserver.DefaultLogSinkConfig() + var err error + if v := cfg.Value(agent.LogSinkDBLoggerBufferSize); v != "" { + result.DBLoggerBufferSize, err = strconv.Atoi(v) + if err != nil { + return result, errors.Annotatef( + err, "parsing %s", agent.LogSinkDBLoggerBufferSize, + ) + } + } + if v := cfg.Value(agent.LogSinkDBLoggerFlushInterval); v != "" { + if result.DBLoggerFlushInterval, err = time.ParseDuration(v); err != nil { + return result, errors.Annotatef( + err, "parsing %s", agent.LogSinkDBLoggerFlushInterval, + ) + } + } + if v := cfg.Value(agent.LogSinkRateLimitBurst); v != "" { + result.RateLimitBurst, err = strconv.ParseInt(v, 10, 64) + if err != nil { + return result, errors.Annotatef( + err, "parsing %s", agent.LogSinkRateLimitBurst, + ) + } + } + if v := cfg.Value(agent.LogSinkRateLimitRefill); v != "" { + result.RateLimitRefill, err = time.ParseDuration(v) + if err != nil { + return result, errors.Annotatef( + err, "parsing %s", agent.LogSinkRateLimitRefill, + ) + } + } + return result, nil +} diff --git a/dependencies.tsv b/dependencies.tsv index cb5eafeb202a..3947eefb8d6d 100644 --- a/dependencies.tsv +++ b/dependencies.tsv @@ -39,6 +39,7 @@ github.com/juju/mempool git 24974d6c264fe5a29716e7d56ea24c4bd904b7cc 2016-02-05T github.com/juju/mutex git 59c26ee163447c5c57f63ff71610d433862013de 2016-06-17T01:09:07Z github.com/juju/persistent-cookiejar git d67418f14c93a698e37b52468958d5d4dcf8a7dd 2017-04-28T16:15:59Z github.com/juju/pubsub git f4dfa62f30adc6955341b3dd73dde7c8d9b23b9e 2017-03-31T03:24:24Z +github.com/juju/ratelimit git 5b9ff866471762aa2ab2dced63c9fb6f53921342 2017-05-23T01:21:41Z github.com/juju/replicaset git 6b5becf2232ce76656ea765d8d915d41755a1513 2016-11-25T16:08:49Z github.com/juju/retry git 62c62032529169c7ec02fa48f93349604c345e1f 2015-10-29T02:48:21Z github.com/juju/rfc git ebdbbdb950cd039a531d15cdc2ac2cbd94f068ee 2016-07-11T02:42:13Z