Skip to content

Commit

Permalink
BufSize option, bump for go1.5.1, bump version
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Sep 18, 2015
1 parent f26e2e4 commit fe3b8f2
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 11 deletions.
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.5
FROM golang:1.5.1

MAINTAINER Derek Collison <derek@apcera.com>

Expand All @@ -10,4 +10,3 @@ RUN CGO_ENABLED=0 go install -v -a -tags netgo -installsuffix netgo -ldflags "-s
EXPOSE 4222 8222
ENTRYPOINT ["gnatsd"]
CMD ["--help"]

4 changes: 4 additions & 0 deletions gnatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func main() {
flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port")
flag.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")

// Not public per se, will be replaced with dynamic system, but can be used to lower memory footprint when
// lots of connections present.
flag.IntVar(&opts.BufSize, "bs", 0, "Read/Write buffer size per client connection.")

flag.Usage = server.Usage

flag.Parse()
Expand Down
11 changes: 5 additions & 6 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
)

const (
// The size of the bufio reader/writer on top of the socket.
defaultBufSize = 32768
// Scratch buffer size for the processMsg() calls.
msgScratchSize = 512
msgHeadProto = "MSG "
Expand Down Expand Up @@ -94,7 +92,7 @@ func init() {
func (c *client) initClient() {
s := c.srv
c.cid = atomic.AddUint64(&s.gcid, 1)
c.bw = bufio.NewWriterSize(c.nc, defaultBufSize)
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
c.subs = hashmap.New()

// This is a scratch buffer used for processMsg()
Expand Down Expand Up @@ -123,8 +121,8 @@ func (c *client) initClient() {
// No clue why, but this stalls and kills performance on Mac (Mavericks).
//
// if ip, ok := c.nc.(*net.TCPConn); ok {
// ip.SetReadBuffer(defaultBufSize)
// ip.SetWriteBuffer(2 * defaultBufSize)
// ip.SetReadBuffer(s.opts.BufSize)
// ip.SetWriteBuffer(2 * s.opts.BufSize)
// }

// Set the Ping timer
Expand All @@ -139,13 +137,14 @@ func (c *client) readLoop() {
// We check for that after the loop, but want to avoid a nil dereference
c.mu.Lock()
nc := c.nc
s := c.srv
c.mu.Unlock()

if nc == nil {
return
}

b := make([]byte, defaultBufSize)
b := make([]byte, s.opts.BufSize)

for {
n, err := nc.Read(b)
Expand Down
2 changes: 1 addition & 1 deletion server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var defaultServerOptions = Options{

func rawSetup(serverOptions Options) (*Server, *client, *bufio.Reader, string) {
cli, srv := net.Pipe()
cr := bufio.NewReaderSize(cli, defaultBufSize)
cr := bufio.NewReaderSize(cli, DEFAULT_BUF_SIZE)
s := New(&serverOptions)
if serverOptions.Authorization != "" {
s.SetAuthMethod(&mockAuth{})
Expand Down
6 changes: 5 additions & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

const (
// VERSION is the current version for the server.
VERSION = "0.6.6"
VERSION = "0.6.8"

// DEFAULT_PORT is the deault port for client connections.
DEFAULT_PORT = 4222
Expand Down Expand Up @@ -82,4 +82,8 @@ const (

// MAX_PUB_ARGS Maximum possible number of arguments from PUB proto.
MAX_PUB_ARGS = 3

// Default Buffer size for reads and writes per connection. Will be replaced by dynamic
// system in the long run.
DEFAULT_BUF_SIZE = 32768
)
4 changes: 4 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Options struct {
RemoteSyslog string `json:"-"`
Routes []*url.URL `json:"-"`
RoutesStr string `json:"-"`
BufSize int `json:"-"`
}

type authorization struct {
Expand Down Expand Up @@ -366,4 +367,7 @@ func processOptions(opts *Options) {
if opts.MaxPending == 0 {
opts.MaxPending = MAX_PENDING_SIZE
}
if opts.BufSize == 0 {
opts.BufSize = DEFAULT_BUF_SIZE
}
}
1 change: 1 addition & 0 deletions server/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestDefaultOptions(t *testing.T) {
MaxPayload: MAX_PAYLOAD_SIZE,
MaxPending: MAX_PENDING_SIZE,
ClusterAuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
BufSize: DEFAULT_BUF_SIZE,
}

opts := &Options{}
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (s *Server) AcceptLoop() {
Noticef("Listening for client connections on %s", hp)
l, e := net.Listen("tcp", hp)
if e != nil {
fmt.Printf("could not listen on port for %s, %v\n", hp, e)
Fatalf("Error listening on port: %s, %q", hp, e)
return
}
Expand Down
2 changes: 1 addition & 1 deletion test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server {
for time.Now().Before(end) {
addr := s.Addr()
if addr == nil {
time.Sleep(10 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
// Retry. We might take a little while to open a connection.
continue
}
Expand Down

0 comments on commit fe3b8f2

Please sign in to comment.