Skip to content

Commit

Permalink
Merge pull request #889 from nats-io/big_payload
Browse files Browse the repository at this point in the history
Don't allow overruns for message payloads, fixes #884
  • Loading branch information
derekcollison committed Jan 31, 2019
2 parents 75a489a + 934b28d commit 39fdcd9
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 42 deletions.
61 changes: 31 additions & 30 deletions server/parser.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -70,7 +70,8 @@ const (
OP_PON
OP_PONG
MSG_PAYLOAD
MSG_END
MSG_END_R
MSG_END_N
OP_S
OP_SU
OP_SUB
Expand Down Expand Up @@ -108,6 +109,7 @@ func (c *client) parse(buf []byte) error {
var i int
var b byte

// FIXME(dlc) - This is wasteful, only can change on reload.
mcl := MAX_CONTROL_LINE_SIZE
if c.srv != nil {
if opts := c.srv.getOpts(); opts != nil {
Expand Down Expand Up @@ -238,35 +240,34 @@ func (c *client) parse(buf []byte) error {
c.msgBuf = append(c.msgBuf, b)
}
if len(c.msgBuf) >= c.pa.size {
c.state = MSG_END
c.state = MSG_END_R
}
} else if i-c.as >= c.pa.size {
c.state = MSG_END
} else if i-c.as+1 >= c.pa.size {
c.state = MSG_END_R
}
case MSG_END:
switch b {
case '\n':
if c.msgBuf != nil {
c.msgBuf = append(c.msgBuf, b)
} else {
c.msgBuf = buf[c.as : i+1]
}
// strict check for proto
if len(c.msgBuf) != c.pa.size+LEN_CR_LF {
goto parseErr
}
c.processInboundMsg(c.msgBuf)
c.argBuf, c.msgBuf = nil, nil
c.drop, c.as, c.state = 0, i+1, OP_START
// Drop all pub args
c.pa.arg, c.pa.pacache, c.pa.account, c.pa.subject = nil, nil, nil, nil
c.pa.reply, c.pa.szb, c.pa.queues = nil, nil, nil
default:
if c.msgBuf != nil {
c.msgBuf = append(c.msgBuf, b)
}
continue
case MSG_END_R:
if b != '\r' {
goto parseErr
}
if c.msgBuf != nil {
c.msgBuf = append(c.msgBuf, b)
}
c.state = MSG_END_N
case MSG_END_N:
if b != '\n' {
goto parseErr
}
if c.msgBuf != nil {
c.msgBuf = append(c.msgBuf, b)
} else {
c.msgBuf = buf[c.as : i+1]
}
c.processInboundMsg(c.msgBuf)
c.argBuf, c.msgBuf = nil, nil
c.drop, c.as, c.state = 0, i+1, OP_START
// Drop all pub args
c.pa.arg, c.pa.pacache, c.pa.account, c.pa.subject = nil, nil, nil, nil
c.pa.reply, c.pa.szb, c.pa.queues = nil, nil, nil
case OP_A:
switch b {
case '+':
Expand Down Expand Up @@ -660,7 +661,7 @@ func (c *client) parse(buf []byte) error {
// jump ahead with the index. If this overruns
// what is left we fall out and process split
// buffer.
i = c.as + c.pa.size - 1
i = c.as + c.pa.size - LEN_CR_LF
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
Expand Down Expand Up @@ -816,7 +817,7 @@ func (c *client) parse(buf []byte) error {
}

// Check for split msg
if (c.state == MSG_PAYLOAD || c.state == MSG_END) && c.msgBuf == nil {
if (c.state == MSG_PAYLOAD || c.state == MSG_END_R || c.state == MSG_END_N) && c.msgBuf == nil {
// We need to clone the pubArg if it is still referencing the
// read buffer and we are not able to process the msg.
if c.argBuf == nil {
Expand Down
38 changes: 31 additions & 7 deletions server/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestParsePub(t *testing.T) {

pub := []byte("PUB foo 5\r\nhello\r")
err := c.parse(pub)
if err != nil || c.state != MSG_END {
if err != nil || c.state != MSG_END_N {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.subject, []byte("foo")) {
Expand All @@ -198,7 +198,7 @@ func TestParsePub(t *testing.T) {

pub = []byte("PUB foo.bar INBOX.22 11\r\nhello world\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
if err != nil || c.state != MSG_END_N {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
Expand All @@ -210,6 +210,19 @@ func TestParsePub(t *testing.T) {
if c.pa.size != 11 {
t.Fatalf("Did not parse msg size correctly: 11 vs %d\n", c.pa.size)
}

// Clear snapshots
c.argBuf, c.msgBuf, c.state = nil, nil, OP_START

// This is the case when data has more bytes than expected by size.
pub = []byte("PUB foo.bar 11\r\nhello world hello world\r")
err = c.parse(pub)
if err == nil {
t.Fatalf("Expected an error parsing longer than expected message body")
}
if c.msgBuf != nil {
t.Fatalf("Did not expect a c.msgBuf to be non-nil")
}
}

func TestParsePubArg(t *testing.T) {
Expand Down Expand Up @@ -326,7 +339,7 @@ func TestParseRouteMsg(t *testing.T) {
}
pub = []byte("RMSG $foo foo 5\r\nhello\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
if err != nil || c.state != MSG_END_N {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.account, []byte("$foo")) {
Expand All @@ -347,7 +360,7 @@ func TestParseRouteMsg(t *testing.T) {

pub = []byte("RMSG $G foo.bar INBOX.22 11\r\nhello world\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
if err != nil || c.state != MSG_END_N {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.account, []byte("$G")) {
Expand All @@ -368,7 +381,7 @@ func TestParseRouteMsg(t *testing.T) {

pub = []byte("RMSG $G foo.bar + reply baz 11\r\nhello world\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
if err != nil || c.state != MSG_END_N {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.account, []byte("$G")) {
Expand All @@ -392,7 +405,7 @@ func TestParseRouteMsg(t *testing.T) {

pub = []byte("RMSG $G foo.bar | baz 11\r\nhello world\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
if err != nil || c.state != MSG_END_N {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.account, []byte("$G")) {
Expand Down Expand Up @@ -562,3 +575,14 @@ func TestParseOK(t *testing.T) {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
}

func TestMaxControlLine(t *testing.T) {
c := dummyClient()
c.srv.opts.MaxControlLine = 8

pub := []byte("PUB foo.bar 11\r")
err := c.parse(pub)
if err != ErrMaxControlLine {
t.Fatalf("Expected an error parsing longer than expected control line")
}
}
10 changes: 5 additions & 5 deletions server/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func TestSplitBufferPubOp(t *testing.T) {
if err := c.parse(pub7); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != MSG_END {
t.Fatalf("Expected MSG_END state vs %d\n", c.state)
if c.state != MSG_END_N {
t.Fatalf("Expected MSG_END_N state vs %d\n", c.state)
}
}

Expand Down Expand Up @@ -241,7 +241,7 @@ func TestSplitBufferPubOp5(t *testing.T) {
c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
pubAll := []byte("PUB foo 11\r\nhello world\r\n")

// Splits need to be on MSG_END now too, so make sure we check that.
// Splits need to be on MSG_END_R now too, so make sure we check that.
// Split between \r and \n
pub := pubAll[:len(pubAll)-1]

Expand Down Expand Up @@ -512,7 +512,7 @@ func TestSplitBufferMsgOp(t *testing.T) {
if err := c.parse(msg8); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != MSG_END {
t.Fatalf("Expected MSG_END state vs %d\n", c.state)
if c.state != MSG_END_N {
t.Fatalf("Expected MSG_END_N state vs %d\n", c.state)
}
}

0 comments on commit 39fdcd9

Please sign in to comment.