diff --git a/nats.go b/nats.go index ce8db129c..b67021bf6 100644 --- a/nats.go +++ b/nats.go @@ -2629,8 +2629,11 @@ func (nc *Conn) processMsg(data []byte) { // It's possible that we end-up not using the message, but that's ok. // FIXME(dlc): Need to copy, should/can do COW? - msgPayload := make([]byte, len(data)) - copy(msgPayload, data) + var msgPayload = data + if !nc.ps.msgCopied { + msgPayload = make([]byte, len(data)) + copy(msgPayload, data) + } // Check if we have headers encoded here. var h Header diff --git a/parser.go b/parser.go index c9cbfeb65..f5f6da8e4 100644 --- a/parser.go +++ b/parser.go @@ -28,14 +28,15 @@ type msgArg struct { const MAX_CONTROL_LINE_SIZE = 4096 type parseState struct { - state int - as int - drop int - hdr int - ma msgArg - argBuf []byte - msgBuf []byte - scratch [MAX_CONTROL_LINE_SIZE]byte + state int + as int + drop int + hdr int + ma msgArg + argBuf []byte + msgBuf []byte + msgCopied bool + scratch [MAX_CONTROL_LINE_SIZE]byte } const ( @@ -167,7 +168,7 @@ func (nc *Conn) parse(buf []byte) error { if nc.ps.msgBuf != nil { if len(nc.ps.msgBuf) >= nc.ps.ma.size { nc.processMsg(nc.ps.msgBuf) - nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END + nc.ps.argBuf, nc.ps.msgBuf, nc.ps.msgCopied, nc.ps.state = nil, nil, false, MSG_END } else { // copy as much as we can to the buffer and skip ahead. toCopy := nc.ps.ma.size - len(nc.ps.msgBuf) @@ -190,7 +191,7 @@ func (nc *Conn) parse(buf []byte) error { } } else if i-nc.ps.as >= nc.ps.ma.size { nc.processMsg(buf[nc.ps.as:i]) - nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END + nc.ps.argBuf, nc.ps.msgBuf, nc.ps.msgCopied, nc.ps.state = nil, nil, false, MSG_END } case MSG_END: switch b { @@ -403,6 +404,7 @@ func (nc *Conn) parse(buf []byte) error { nc.ps.msgBuf = make([]byte, lrem, nc.ps.ma.size) copy(nc.ps.msgBuf, buf[nc.ps.as:]) + nc.ps.msgCopied = true } else { nc.ps.msgBuf = nc.ps.scratch[len(nc.ps.argBuf):len(nc.ps.argBuf)] nc.ps.msgBuf = append(nc.ps.msgBuf, (buf[nc.ps.as:])...)