Skip to content

Commit

Permalink
Merge pull request #89 from nats-io/lrgmsgperf
Browse files Browse the repository at this point in the history
Large Message Performance
  • Loading branch information
derekcollison committed Jul 3, 2015
2 parents 2797089 + 5a07dff commit ed8cafe
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 32 deletions.
26 changes: 15 additions & 11 deletions TODO.md
@@ -1,24 +1,28 @@

# General

- [X] Remove reliance on `ps`
- [X] Syslog support
- [ ] SSL/TLS support
- [ ] nats-top equivalent, utils
- [ ] Pedantic state
- [X] Daemon mode? Won't fix
- [ ] Connz report routes
- [ ] Info updates contain other implicit route servers
- [X] Docker
- [ ] brew, apt-get, rpm, chocately (windows)
- [ ] Dynamic socket buffer sizes
- [ ] Switch to 1.4 and use maps vs hashmaps
- [ ] Buffer pools?
- [ ] Switch to 1.4 and use maps vs hashmaps in sublist
- [ ] Sublist better at high concurrency
- [ ] Buffer pools/sync pools?
- [ ] Add ability to reload config on signal
- [ ] NewSource on Rand do lower lock contention on QueueSubs
- [ ] Add ENV support to dconf
- [ ] Add ENV and variable support to dconf
- [ ] Modify cluster support for single message across routes between pub/sub and d-queue
- [ ] Client support for language and version
- [ ] Place version in varz?
- [ ] Remove options in varz by default?
- [ ] Memory limits/warnings?
- [ ] Gossip Protocol for discovery for clustering
- [ ] Info updates contain other implicit route servers
- [ ] Dropped message statistics
- [X] nats-top equivalent, utils
- [X] Connz report routes
- [X] Docker
- [X] Remove reliance on `ps`
- [X] Syslog support
- [X] Client support for language and version
- [X] Fix benchmarks on linux
- [X] Daemon mode? Won't fix
2 changes: 1 addition & 1 deletion server/client.go
Expand Up @@ -123,7 +123,7 @@ func (c *client) initClient() {
//
// if ip, ok := c.nc.(*net.TCPConn); ok {
// ip.SetReadBuffer(defaultBufSize)
// ip.SetWriteBuffer(2*defaultBufSize)
// ip.SetWriteBuffer(2 * defaultBufSize)
// }

// Set the Ping timer
Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Expand Up @@ -8,7 +8,7 @@ import (

const (
// VERSION is the current version for the server.
VERSION = "0.6.0"
VERSION = "0.6.1.beta"

// DEFAULT_PORT is the deault port for client connections.
DEFAULT_PORT = 4222
Expand Down
44 changes: 39 additions & 5 deletions server/parser.go
Expand Up @@ -87,7 +87,10 @@ func (c *client) parse(buf []byte) error {
// proper CONNECT if needed.
authSet := c.isAuthTimerSet()

for i, b = range buf {
// Move to loop instead of range syntax to allow jumping of i
for i = 0; i < len(buf); i++ {
b = buf[i]

switch c.state {
case OP_START:
if b != 'C' && b != 'c' && authSet {
Expand Down Expand Up @@ -161,14 +164,36 @@ func (c *client) parse(buf []byte) error {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
// If we don't have a saved buffer then jump ahead with
// the index. If this overruns what is left we fall out
// and process split buffer.
if c.msgBuf == nil {
i = c.as + c.pa.size - LEN_CR_LF
}
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case MSG_PAYLOAD:
if c.msgBuf != nil {
c.msgBuf = append(c.msgBuf, b)
// copy as much as we can to the buffer and skip ahead.
toCopy := c.pa.size - len(c.msgBuf)
avail := len(buf) - i
if avail < toCopy {
toCopy = avail
}
if toCopy > 0 {
start := len(c.msgBuf)
// This is needed for copy to work.
c.msgBuf = c.msgBuf[:start+toCopy]
copy(c.msgBuf[start:], buf[i:i+toCopy])
// Update our index
i = (i + toCopy) - 1
} else {
// Fall back to append if needed.
c.msgBuf = append(c.msgBuf, b)
}
if len(c.msgBuf) >= c.pa.size {
c.state = MSG_END
}
Expand Down Expand Up @@ -587,7 +612,7 @@ func (c *client) parse(buf []byte) error {
c.state == MSG_ARG || c.state == MINUS_ERR_ARG ||
c.state == CONNECT_ARG) && c.argBuf == nil {
c.argBuf = c.scratch[:0]
c.argBuf = append(c.argBuf, buf[c.as:(i+1)-c.drop]...)
c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
// FIXME, check max len
}
// Check for split msg
Expand All @@ -597,8 +622,17 @@ func (c *client) parse(buf []byte) error {
if c.argBuf == nil {
c.clonePubArg()
}
c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
c.msgBuf = append(c.msgBuf, (buf[c.as:])...)

// If we will overflow the scratch buffer, just create a
// new buffer to hold the split message.
if c.pa.size > cap(c.scratch)-len(c.argBuf) {
lrem := len(buf[c.as:])
c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
copy(c.msgBuf, buf[c.as:])
} else {
c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion sublist/sublist.go
@@ -1,4 +1,4 @@
// Copyright 2012-2013 Apcera Inc. All rights reserved.
// Copyright 2012-2015 Apcera Inc. All rights reserved.

// Sublist is a subject distribution data structure that can match subjects to
// interested subscribers. Subscribers can have wildcard subjects to match
Expand Down
37 changes: 24 additions & 13 deletions test/bench_test.go
Expand Up @@ -4,10 +4,10 @@ package test

import (
"bufio"
"crypto/rand"
"encoding/hex"
// "encoding/hex"
"fmt"
"io"
// "io"
"math/rand"
"net"
"testing"
"time"
Expand All @@ -25,17 +25,18 @@ func runBenchServer() *server.Server {
}

const defaultRecBufSize = 32768
const defaultSendBufSize = 16384
const defaultSendBufSize = 32768

func flushConnection(b *testing.B, c net.Conn, buf []byte) {
func flushConnection(b *testing.B, c net.Conn) {
buf := make([]byte, 32)
c.Write([]byte("PING\r\n"))
c.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
c.SetReadDeadline(time.Now().Add(1 * time.Second))
n, err := c.Read(buf)
c.SetReadDeadline(time.Time{})
if err != nil {
b.Fatalf("Failed read: %v\n", err)
}
if n != 6 && buf[0] != 'P' {
if n != 6 && buf[0] != 'P' && buf[1] != 'O' {
b.Fatalf("Failed read of PONG: %s\n", buf)
}
}
Expand All @@ -48,22 +49,25 @@ func benchPub(b *testing.B, subject, payload string) {
bw := bufio.NewWriterSize(c, defaultSendBufSize)
sendOp := []byte(fmt.Sprintf("PUB %s %d\r\n%s\r\n", subject, len(payload), payload))
b.SetBytes(int64(len(sendOp)))
buf := make([]byte, 1024)
b.StartTimer()
for i := 0; i < b.N; i++ {
bw.Write(sendOp)
}
bw.Flush()
flushConnection(b, c, buf)
flushConnection(b, c)
b.StopTimer()
c.Close()
s.Shutdown()
}

var ch = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$#%^&*()")

func sizedString(sz int) string {
u := make([]byte, sz)
io.ReadFull(rand.Reader, u)
return hex.EncodeToString(u)
b := make([]byte, sz)
for i := range b {
b[i] = ch[rand.Intn(len(ch))]
}
return string(b)
}

func Benchmark___PubNo_Payload(b *testing.B) {
Expand Down Expand Up @@ -100,12 +104,18 @@ func Benchmark___Pub4K_Payload(b *testing.B) {
benchPub(b, "a", s)
}

func Benchmark___Pub8K_Payload(b *testing.B) {
b.StopTimer()
s := sizedString(8 * 1024)
benchPub(b, "a", s)
}

func drainConnection(b *testing.B, c net.Conn, ch chan bool, expected int) {
buf := make([]byte, defaultRecBufSize)
bytes := 0

for {
c.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
c.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := c.Read(buf)
if err != nil {
b.Errorf("Error on read: %v\n", err)
Expand Down Expand Up @@ -164,6 +174,7 @@ func Benchmark__PubSubTwoConns(b *testing.B) {
c2 := createClientConn(b, "localhost", PERF_PORT)
doDefaultConnect(b, c2)
sendProto(b, c2, "SUB foo 1\r\n")
flushConnection(b, c2)

sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n"))
ch := make(chan bool)
Expand Down

0 comments on commit ed8cafe

Please sign in to comment.