Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] Enforce max_control_line for client connections only #1850

Merged
merged 2 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 7 additions & 9 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,13 @@ func (c *client) initClient() {
// Snapshots to avoid mutex access in fast paths.
c.out.wdl = opts.WriteDeadline
c.out.mp = opts.MaxPending
// Snapshot max control line since currently can not be changed on reload and we
// were checking it on each call to parse. If this changes and we allow MaxControlLine
// to be reloaded without restart, this code will need to change.
c.mcl = int32(opts.MaxControlLine)
if c.mcl == 0 {
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
c.mcl = MAX_CONTROL_LINE_SIZE
}

c.subs = make(map[string]*subscription)
c.echo = true
Expand Down Expand Up @@ -1005,15 +1012,6 @@ func (c *client) readLoop(pre []byte) {
c.mqtt.r = &mqttReader{reader: nc}
}
c.in.rsz = startBufSize
// Snapshot max control line since currently can not be changed on reload and we
// were checking it on each call to parse. If this changes and we allow MaxControlLine
// to be reloaded without restart, this code will need to change.
c.mcl = MAX_CONTROL_LINE_SIZE
if s != nil {
if opts := s.getOpts(); opts != nil {
c.mcl = int32(opts.MaxControlLine)
}
}

// Check the per-account-cache for closed subscriptions
cpacc := c.kind == ROUTER || c.kind == GATEWAY
Expand Down
3 changes: 3 additions & 0 deletions server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ const (
// PROTO_SNIPPET_SIZE is the default size of proto to print on parse errors.
PROTO_SNIPPET_SIZE = 32

// MAX_CONTROL_LINE_SNIPPET_SIZE is the default size of proto to print on max control line errors.
MAX_CONTROL_LINE_SNIPPET_SIZE = 128

// MAX_MSG_ARGS Maximum possible number of arguments from MSG proto.
MAX_MSG_ARGS = 4

Expand Down
1 change: 1 addition & 0 deletions server/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func TestNoPasswordsFromConnectTrace(t *testing.T) {
opts.Username = "derek"
opts.Password = "s3cr3t"
opts.PingInterval = 2 * time.Minute
setBaselineOptions(opts)
s := &Server{opts: opts}
dl := &DummyLogger{}
s.SetLogger(dl, false, true)
Expand Down
62 changes: 54 additions & 8 deletions server/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
if trace {
c.traceInOp("HPUB", arg)
}
Expand Down Expand Up @@ -319,6 +322,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
var err error
if c.kind == ROUTER || c.kind == GATEWAY {
if trace {
Expand Down Expand Up @@ -391,6 +397,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
if trace {
c.traceInOp("PUB", arg)
}
Expand Down Expand Up @@ -508,6 +517,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
if trace {
c.traceInOp("A+", arg)
}
Expand Down Expand Up @@ -547,6 +559,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
if trace {
c.traceInOp("A-", arg)
}
Expand Down Expand Up @@ -598,6 +613,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
var err error

switch c.kind {
Expand Down Expand Up @@ -730,6 +748,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
var err error

switch c.kind {
Expand Down Expand Up @@ -876,6 +897,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
if trace {
c.traceInOp("CONNECT", removePassFromTrace(arg))
}
Expand Down Expand Up @@ -934,6 +958,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
var err error
if c.kind == ROUTER || c.kind == GATEWAY {
switch c.op {
Expand Down Expand Up @@ -1010,6 +1037,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
if err := c.processInfo(arg); err != nil {
return err
}
Expand Down Expand Up @@ -1086,6 +1116,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
return err
}
c.processErr(string(arg))
c.drop, c.as, c.state = 0, i+1, OP_START
default:
Expand Down Expand Up @@ -1113,11 +1146,7 @@ func (c *client) parse(buf []byte) error {
// Check for violations of control line length here. Note that this is not
// exact at all but the performance hit is too great to be precise, and
// catching here should prevent memory exhaustion attacks.
if len(c.argBuf) > int(mcl) {
err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d",
c.state, int(mcl), len(c.argBuf))
c.sendErr(err.Error())
c.closeConnection(MaxControlLineExceeded)
if err := c.overMaxControlLineLimit(c.argBuf, mcl); err != nil {
return err
}
}
Expand Down Expand Up @@ -1160,13 +1189,13 @@ authErr:

parseErr:
c.sendErr("Unknown Protocol Operation")
snip := protoSnippet(i, buf)
snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf)
err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.typeString(), c.state, i, snip)
return err
}

func protoSnippet(start int, buf []byte) string {
stop := start + PROTO_SNIPPET_SIZE
func protoSnippet(start, max int, buf []byte) string {
stop := start + max
bufSize := len(buf)
if start >= bufSize {
return `""`
Expand All @@ -1177,6 +1206,23 @@ func protoSnippet(start int, buf []byte) string {
return fmt.Sprintf("%q", buf[start:stop])
}

// Check if the length of buffer `arg` is over the max control line limit `mcl`.
// If so, an error is sent to the client and the connection is closed.
// The error ErrMaxControlLine is returned.
func (c *client) overMaxControlLineLimit(arg []byte, mcl int32) error {
if c.kind != CLIENT {
return nil
}
if len(arg) > int(mcl) {
err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d (snip: %s...)",
c.state, int(mcl), len(c.argBuf), protoSnippet(0, MAX_CONTROL_LINE_SNIPPET_SIZE, arg))
c.sendErr(err.Error())
c.closeConnection(MaxControlLineExceeded)
return err
}
return nil
}

// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
// we need to hold onto it into the next read.
func (c *client) clonePubArg(lmsg bool) error {
Expand Down
62 changes: 55 additions & 7 deletions server/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func TestProtoSnippet(t *testing.T) {
}

for _, tt := range tests {
got := protoSnippet(tt.input, sample)
got := protoSnippet(tt.input, PROTO_SNIPPET_SIZE, sample)
if tt.expected != got {
t.Errorf("Expected protocol snippet to be %s when start=%d but got %s\n", tt.expected, tt.input, got)
}
Expand Down Expand Up @@ -811,12 +811,60 @@ func TestParseOK(t *testing.T) {
}

func TestMaxControlLine(t *testing.T) {
c := dummyClient()
c.mcl = 8
for _, test := range []struct {
name string
kind int
shouldFail bool
}{
{"client", CLIENT, true},
{"leaf", LEAF, false},
{"route", ROUTER, false},
{"gateway", GATEWAY, false},
} {
t.Run(test.name, func(t *testing.T) {
pub := []byte("PUB foo.bar.baz 2\r\nok\r\n")

setupClient := func() *client {
c := dummyClient()
c.setNoReconnect()
c.flags.set(connectReceived)
c.kind = test.kind
if test.kind == GATEWAY {
c.gw = &gateway{outbound: false, connected: true, insim: make(map[string]*insie)}
}
c.mcl = 8
return c
}

pub := []byte("PUB foo.bar 11\r")
err := c.parse(pub)
if !ErrorIs(err, ErrMaxControlLine) {
t.Fatalf("Expected an error parsing longer than expected control line")
c := setupClient()
// First try with a partial:
// PUB foo.bar.baz 2\r\nok\r\n
// .............^
err := c.parse(pub[:14])
switch test.shouldFail {
case true:
if !ErrorIs(err, ErrMaxControlLine) {
t.Fatalf("Expected an error parsing longer than expected control line")
}
case false:
if err != nil {
t.Fatalf("Should not have failed, got %v", err)
}
}

// Now with full protocol (no split) and we should still enforce.
c = setupClient()
err = c.parse(pub)
switch test.shouldFail {
case true:
if !ErrorIs(err, ErrMaxControlLine) {
t.Fatalf("Expected an error parsing longer than expected control line")
}
case false:
if err != nil {
t.Fatalf("Should not have failed, got %v", err)
}
}
})
}
}