Skip to content

Commit

Permalink
[ADDED] Server-wide rate limiting
Browse files Browse the repository at this point in the history
This is a very basic and not necessarily accurate server-wide rate
limiting. Both ingress and egress are counted toward the limit.
The changes when running without rate limit should not have any
performance impact. When rate limiting is enabled, but limit is
very high, a performance degradation is possible (cost of 3 atomic
operations).
  • Loading branch information
kozlovic committed Jan 30, 2018
1 parent b56ca22 commit da8b92e
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 4 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ max_payload: 65536
# Duration the server can block on a socket write to a client. Exceeding the
# deadline will designate a client as a slow consumer.
write_deadline: "2s"
# rate limit
msg_rate: 100
```

Inside configuration files, string values support the following escape characters: `\xXX, \t, \n, \r, \", \\`. Take note that when specifying directory paths in options such as `pid_file` and `log_file` on Windows, you'll need to escape backslashes, e.g. `log_file: "c:\\logging\\log.txt"`, or use unix style (`/`) path separators.
Expand Down
30 changes: 26 additions & 4 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type client struct {
last time.Time
parseState

rateMaxMsgs int64
rateMaxBytes int64

route *route
debug bool
trace bool
Expand Down Expand Up @@ -272,6 +275,11 @@ func (c *client) readLoop() {

// Snapshot server options.
opts := s.getOpts()
// Now it is safe to check opts.MsgRate since we deal with a copy
if int64(opts.MsgRate) != c.rateMaxMsgs {
c.rateMaxMsgs = int64(opts.MsgRate)
c.rateMaxBytes = c.rateMaxMsgs * 512
}

for {
n, err := nc.Read(b)
Expand Down Expand Up @@ -1016,8 +1024,9 @@ func (c *client) processMsg(msg []byte) {

// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
c.cache.inMsgs += 1
c.cache.inBytes += len(msg) - LEN_CR_LF
msgSize := len(msg) - LEN_CR_LF
c.cache.inMsgs++
c.cache.inBytes += msgSize

if c.trace {
c.traceMsg(msg)
Expand Down Expand Up @@ -1075,8 +1084,15 @@ func (c *client) processMsg(msg []byte) {
return
}

var r *SublistResult
var ok bool
var (
doRate bool
r *SublistResult
ok bool
)
if c.rateMaxMsgs > 0 {
doRate = true
srv.doRateControl(c.rateMaxMsgs, c.rateMaxBytes, int64(msgSize))
}

genid := atomic.LoadUint64(&srv.sl.genid)

Expand Down Expand Up @@ -1175,6 +1191,9 @@ func (c *client) processMsg(msg []byte) {
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
if doRate {
srv.doRateControl(c.rateMaxMsgs, c.rateMaxBytes, int64(msgSize))
}
}

// Now process any queue subs we have if not a route
Expand All @@ -1192,6 +1211,9 @@ func (c *client) processMsg(msg []byte) {
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
if doRate {
srv.doRateControl(c.rateMaxMsgs, c.rateMaxBytes, int64(msgSize))
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/configs/reload/reload.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ cluster {
listen: localhost:-1
no_advertise: true # enable on reload
}

msg_rate: 100
3 changes: 3 additions & 0 deletions server/configs/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ ping_max: 3

# how long server can block on a socket write to a client
write_deadline: "3s"

# rate limit
msg_rate: 1000000
7 changes: 7 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Options struct {
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
WriteDeadline time.Duration `json:"-"`
MsgRate int `json:"-"`

CustomClientAuthentication Authentication `json:"-"`
CustomRouterAuthentication Authentication `json:"-"`
Expand Down Expand Up @@ -302,6 +303,8 @@ func (o *Options) ProcessConfigFile(configFile string) error {
o.WriteDeadline = time.Duration(v.(int64)) * time.Second
fmt.Printf("WARNING: write_deadline should be converted to a duration\n")
}
case "msg_rate", "msgs_rate":
o.MsgRate = int(v.(int64))
}
}
return nil
Expand Down Expand Up @@ -762,6 +765,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
if flagOpts.RoutesStr != "" {
mergeRoutes(&opts, flagOpts)
}
if flagOpts.MsgRate != 0 {
opts.MsgRate = flagOpts.MsgRate
}
return &opts
}

Expand Down Expand Up @@ -982,6 +988,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.")
fs.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.")
fs.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.")
fs.IntVar(&opts.MsgRate, "msg_rate", 0, "Maximum message rate (server wide)")

// The flags definition above set "default" values to some of the options.
// Calling Parse() here will override the default options with any value
Expand Down
3 changes: 3 additions & 0 deletions server/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestConfigFile(t *testing.T) {
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
WriteDeadline: 3 * time.Second,
MsgRate: 1000000,
}

opts, err := ProcessConfigFile("./configs/test.conf")
Expand Down Expand Up @@ -235,6 +236,7 @@ func TestMergeOverrides(t *testing.T) {
ConnectRetries: 2,
},
WriteDeadline: 3 * time.Second,
MsgRate: 10000,
}
fopts, err := ProcessConfigFile("./configs/test.conf")
if err != nil {
Expand All @@ -252,6 +254,7 @@ func TestMergeOverrides(t *testing.T) {
NoAdvertise: true,
ConnectRetries: 2,
},
MsgRate: 10000,
}
merged := MergeOptions(fopts, opts)

Expand Down
12 changes: 12 additions & 0 deletions server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,16 @@ func (w *writeDeadlineOption) Apply(server *Server) {
server.Noticef("Reloaded: write_deadline = %s", w.newValue)
}

// msgRateOption implements the option interface for the `msg_rate` setting.
type msgRateOption struct {
noopOption
newValue int
}

func (m *msgRateOption) Apply(server *Server) {
server.Noticef("Reloaded msg_rate = %v", m.newValue)
}

// Reload reads the current configuration file and applies any supported
// changes. This returns an error if the server was not started with a config
// file or an option which doesn't support hot-swapping was changed.
Expand Down Expand Up @@ -518,6 +528,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &maxPingsOutOption{newValue: newValue.(int)})
case "writedeadline":
diffOpts = append(diffOpts, &writeDeadlineOption{newValue: newValue.(time.Duration)})
case "msgrate":
diffOpts = append(diffOpts, &msgRateOption{newValue: newValue.(int)})
case "nolog":
// Ignore NoLog option since it's not parsed and only used in
// testing.
Expand Down
3 changes: 3 additions & 0 deletions server/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ func TestConfigReload(t *testing.T) {
if updated.MaxPayload != 1024 {
t.Fatalf("MaxPayload is incorrect.\nexpected 1024\ngot: %d", updated.MaxPayload)
}
if updated.MsgRate != 100 {
t.Fatalf("MsgRate is incorrect.\nexpected 100\ngot: %d", updated.MsgRate)
}

if reloaded := server.ConfigTime(); !reloaded.After(loaded) {
t.Fatalf("ConfigTime is incorrect.\nexpected greater than: %s\ngot: %s", loaded, reloaded)
Expand Down
60 changes: 60 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

// Allow dynamic profiling.
Expand Down Expand Up @@ -44,10 +45,21 @@ type Info struct {
clientConnectURLs map[string]struct{}
}

type rateInfo struct {
msgs int64
bytes int64
lastCheck int64
checkID uint64
mu sync.Mutex
chQuit chan struct{}
done bool
}

// Server is our main struct.
type Server struct {
gcid uint64
stats
rate rateInfo
mu sync.Mutex
info Info
infoJSON []byte
Expand Down Expand Up @@ -155,6 +167,9 @@ func New(opts *Options) *Server {
// Used to setup Authorization.
s.configureAuthorization()

// Rate limit related settings
s.rate.chQuit = make(chan struct{}, 1)

s.generateServerInfoJSON()
s.handleSignals()

Expand Down Expand Up @@ -355,6 +370,9 @@ func (s *Server) Shutdown() {
// Release the solicited routes connect go routines.
close(s.rcQuit)

// Release possible sleep in rate control
s.rate.chQuit <- struct{}{}

s.mu.Unlock()

// Close client and route connections
Expand Down Expand Up @@ -1033,3 +1051,45 @@ func (s *Server) getClientConnectURLs() []string {
}
return urls
}

func (s *Server) doRateControl(maxMsgs, maxBytes, msgSize int64) {
r := &s.rate
msgs := atomic.AddInt64(&r.msgs, 1)
bytes := atomic.AddInt64(&r.bytes, msgSize)
if atomic.LoadInt64(&r.lastCheck) == 0 {
r.mu.Lock()
if r.lastCheck == 0 {
atomic.StoreInt64(&r.lastCheck, time.Now().UnixNano())
}
r.mu.Unlock()
}
if msgs >= maxMsgs || bytes >= maxBytes {
id := atomic.LoadUint64(&r.checkID)
r.mu.Lock()
if r.done {
r.mu.Unlock()
return
}
if r.checkID == id {
now := time.Now().UnixNano()
delta := time.Duration(now - r.lastCheck)
if delta < time.Second {
select {
case <-time.After(time.Second - delta):
case <-r.chQuit:
r.done = true
r.mu.Unlock()
return
}
atomic.AddInt64(&r.msgs, -msgs)
atomic.AddInt64(&r.bytes, -bytes)
} else {
atomic.AddInt64(&r.msgs, -msgs+1)
atomic.AddInt64(&r.bytes, -msgs+msgSize)
}
atomic.StoreInt64(&r.lastCheck, 0)
atomic.StoreUint64(&r.checkID, r.checkID+1)
}
r.mu.Unlock()
}
}
Loading

0 comments on commit da8b92e

Please sign in to comment.