Skip to content
This repository has been archived by the owner on Jun 9, 2019. It is now read-only.

Commit

Permalink
Fixed INFO split parsing
Browse files Browse the repository at this point in the history
Added test for split buffer when processing INFO protocol
  • Loading branch information
kozlovic committed Aug 1, 2016
1 parent 6db5372 commit a73f86b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
65 changes: 59 additions & 6 deletions nats_test.go
Expand Up @@ -14,8 +14,10 @@ import (
"testing"
"time"

"encoding/json"
"github.com/nats-io/gnatsd/server"
gnatsd "github.com/nats-io/gnatsd/test"
"runtime"
)

// Dumb wait program to sync on callbacks, etc... Will timeout
Expand All @@ -32,6 +34,23 @@ func WaitTime(ch chan bool, timeout time.Duration) error {
return errors.New("timeout")
}

func stackFatalf(t *testing.T, f string, args ...interface{}) {
lines := make([]string, 0, 32)
msg := fmt.Sprintf(f, args...)
lines = append(lines, msg)

// Generate the Stack of callers: Skip us and verify* frames.
for i := 2; true; i++ {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
}
msg := fmt.Sprintf("%d - %s:%d", i, file, line)
lines = append(lines, msg)
}
t.Fatalf("%s", strings.Join(lines, "\n"))
}

////////////////////////////////////////////////////////////////////////////////
// Reconnect tests
////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -834,8 +853,40 @@ func TestAsyncINFO(t *testing.T) {
// Server pool needs to be setup
c.setupServerPool()

// Partials requiring argBuf
expectedServer := serverInfo{
Id: "test",
Host: "localhost",
Port: 4222,
Version: "1.2.3",
AuthRequired: true,
TLSRequired: true,
MaxPayload: 2 * 1024 * 1024,
ConnectURLs: []string{"localhost:5222", "localhost:6222"},
}
b, _ := json.Marshal(expectedServer)
info = []byte(fmt.Sprintf("INFO %s\r\n", b))
if c.ps.state != OP_START {
t.Fatalf("Expected OP_START vs %d\n", c.ps.state)
}
err = c.parse(info[:9])
if err != nil || c.ps.state != INFO_ARG || c.ps.argBuf == nil {
t.Fatalf("Unexpected: %d err: %v argBuf: %v\n", c.ps.state, err, c.ps.argBuf)
}
err = c.parse(info[9:11])
if err != nil || c.ps.state != INFO_ARG || c.ps.argBuf == nil {
t.Fatalf("Unexpected: %d err: %v argBuf: %v\n", c.ps.state, err, c.ps.argBuf)
}
err = c.parse(info[11:])
if err != nil || c.ps.state != OP_START || c.ps.argBuf != nil {
t.Fatalf("Unexpected: %d err: %v argBuf: %v\n", c.ps.state, err, c.ps.argBuf)
}
if !reflect.DeepEqual(c.info, expectedServer) {
t.Fatalf("Expected server info to be: %v, got: %v", expectedServer, c.info)
}

// Good INFOs
good := []string{"INFO {}\r\n", "INFO {} \r\n", "INFO { \"server_id\": \"test\" } \r\n", "INFO {\"connect_urls\":[]}\r\n"}
good := []string{"INFO {}\r\n", "INFO {}\r\n", "INFO {} \r\n", "INFO { \"server_id\": \"test\" } \r\n", "INFO {\"connect_urls\":[]}\r\n"}
for _, gi := range good {
c.ps = &parseState{}
err = c.parse([]byte(gi))
Expand All @@ -845,7 +896,7 @@ func TestAsyncINFO(t *testing.T) {
}

// Wrong INFOs
wrong := []string{"INFOx {}\r\n", "INFO{}\r\n", "INFO {}"}
wrong := []string{"IxNFO {}\r\n", "INxFO {}\r\n", "INFxO {}\r\n", "INFOx {}\r\n", "INFO{}\r\n", "INFO {}"}
for _, wi := range wrong {
c.ps = &parseState{}
err = c.parse([]byte(wi))
Expand All @@ -857,19 +908,19 @@ func TestAsyncINFO(t *testing.T) {
checkPool := func(urls ...string) {
// Check both pool and urls map
if len(c.srvPool) != len(urls) {
t.Fatalf("Pool should have %d elements, has %d", len(urls), len(c.srvPool))
stackFatalf(t, "Pool should have %d elements, has %d", len(urls), len(c.srvPool))
}
if len(c.urls) != len(urls) {
t.Fatalf("Map should have %d elements, has %d", len(urls), len(c.urls))
stackFatalf(t, "Map should have %d elements, has %d", len(urls), len(c.urls))
}
for i, url := range urls {
if c.Opts.NoRandomize {
if c.srvPool[i].url.Host != url {
t.Fatalf("Pool should have %q at index %q, has %q", url, i, c.srvPool[i].url.Host)
stackFatalf(t, "Pool should have %q at index %q, has %q", url, i, c.srvPool[i].url.Host)
}
} else {
if _, present := c.urls[url]; !present {
t.Fatalf("Pool should have %q", url)
stackFatalf(t, "Pool should have %q", url)
}
}
}
Expand All @@ -881,6 +932,8 @@ func TestAsyncINFO(t *testing.T) {
c.Opts.NoRandomize = true
// Reset the pool
c.setupServerPool()
// Reinitialize the parser
c.ps = &parseState{}

info = []byte("INFO {\"connect_urls\":[\"localhost:5222\"]}\r\n")
err = c.parse(info)
Expand Down
8 changes: 4 additions & 4 deletions parser.go
Expand Up @@ -55,7 +55,7 @@ const (
OP_INF
OP_INFO
OP_INFO_SPC
OP_INFO_ARG
INFO_ARG
)

// parse is the fast protocol parser engine.
Expand Down Expand Up @@ -330,10 +330,10 @@ func (nc *Conn) parse(buf []byte) error {
case ' ', '\t':
continue
default:
nc.ps.state = OP_INFO_ARG
nc.ps.state = INFO_ARG
nc.ps.as = i
}
case OP_INFO_ARG:
case INFO_ARG:
switch b {
case '\r':
nc.ps.drop = 1
Expand All @@ -357,7 +357,7 @@ func (nc *Conn) parse(buf []byte) error {
}
}
// Check for split buffer scenarios
if (nc.ps.state == MSG_ARG || nc.ps.state == MINUS_ERR_ARG) && nc.ps.argBuf == nil {
if (nc.ps.state == MSG_ARG || nc.ps.state == MINUS_ERR_ARG || nc.ps.state == INFO_ARG) && nc.ps.argBuf == nil {
nc.ps.argBuf = nc.ps.scratch[:0]
nc.ps.argBuf = append(nc.ps.argBuf, buf[nc.ps.as:i-nc.ps.drop]...)
// FIXME, check max len
Expand Down

0 comments on commit a73f86b

Please sign in to comment.