Skip to content

Commit

Permalink
nsqd: add framed snappy compression
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Sep 10, 2013
1 parent c5d2cc2 commit d3e525e
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 18 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Expand Up @@ -6,10 +6,11 @@ env:
- GOARCH=amd64
- GOARCH=386
install:
- go get github.com/bmizerany/assert
- go get github.com/bitly/go-nsq
- go get github.com/bitly/go-hostpool
- go get github.com/bitly/go-simplejson
- go get github.com/mreiferson/go-snappystream
- go get github.com/bitly/go-hostpool
- go get github.com/bmizerany/assert
script:
- pushd $TRAVIS_BUILD_DIR
- ./test.sh
Expand Down
3 changes: 2 additions & 1 deletion dist.sh
Expand Up @@ -20,9 +20,10 @@ git archive HEAD | tar -x -C $TMPGOPATH/src/github.com/bitly/nsq
export GOPATH="$TMPGOPATH:$GOROOT"

echo "... getting dependencies"
go get -v github.com/bitly/go-nsq
go get -v github.com/bitly/go-simplejson
go get -v github.com/mreiferson/go-snappystream
go get -v github.com/bitly/go-hostpool
go get -v github.com/bitly/go-nsq
go get -v github.com/bmizerany/assert

pushd $TMPGOPATH/src/github.com/bitly/nsq
Expand Down
27 changes: 22 additions & 5 deletions nsqd/client_v2.go
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"github.com/bitly/go-nsq"
"github.com/mreiferson/go-snappystream"
"log"
"net"
"sync"
Expand All @@ -24,6 +25,7 @@ type IdentifyDataV2 struct {
TLSv1 bool `json:"tls_v1"`
Deflate bool `json:"deflate"`
DeflateLevel int `json:"deflate_level"`
Snappy bool `json:"snappy"`
}

type ClientV2 struct {
Expand All @@ -38,12 +40,12 @@ type ClientV2 struct {
net.Conn
sync.Mutex

ID int64
context *Context
tlsConn net.Conn
flateWriter *flate.Writer
ID int64
context *Context

tlsConn net.Conn
flateWriter *flate.Writer

// buffered IO
Reader *bufio.Reader
Writer *bufio.Writer
OutputBufferSize int
Expand Down Expand Up @@ -338,6 +340,21 @@ func (c *ClientV2) UpgradeDeflate(level int) error {
return nil
}

func (c *ClientV2) UpgradeSnappy() error {
c.Lock()
defer c.Unlock()

conn := c.Conn
if c.tlsConn != nil {
conn = c.tlsConn
}

c.Reader = bufio.NewReaderSize(snappystream.NewReader(conn, snappystream.SkipVerifyChecksum), DefaultBufferSize)
c.Writer = bufio.NewWriterSize(snappystream.NewWriter(conn), c.OutputBufferSize)

return nil
}

func (c *ClientV2) Flush() error {
err := c.Writer.Flush()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions nsqd/main.go
Expand Up @@ -58,6 +58,7 @@ var (
// compression
deflateEnabled = flag.Bool("deflate", true, "enable deflate feature negotiation (client compression)")
maxDeflateLevel = flag.Int("max-deflate-level", 6, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
snappyEnabled = flag.Bool("snappy", true, "enable snappy feature negotiation (client compression)")
)

func init() {
Expand Down Expand Up @@ -134,6 +135,7 @@ func main() {
options.tlsKey = *tlsKey
options.deflateEnabled = *deflateEnabled
options.maxDeflateLevel = *maxDeflateLevel
options.snappyEnabled = *snappyEnabled

if *statsdAddress != "" {
// flagToDuration will fatally error if it is invalid
Expand Down
4 changes: 3 additions & 1 deletion nsqd/nsqd.go
Expand Up @@ -78,9 +78,10 @@ type nsqdOptions struct {
tlsCert string
tlsKey string

// deflate
// compression
deflateEnabled bool
maxDeflateLevel int
snappyEnabled bool
}

func NewNsqdOptions() *nsqdOptions {
Expand Down Expand Up @@ -113,6 +114,7 @@ func NewNsqdOptions() *nsqdOptions {

deflateEnabled: true,
maxDeflateLevel: -1,
snappyEnabled: true,
}
}

Expand Down
20 changes: 20 additions & 0 deletions nsqd/protocol_v2.go
Expand Up @@ -322,6 +322,11 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
}
deflateLevel = int(math.Min(float64(deflateLevel), float64(p.context.nsqd.options.maxDeflateLevel)))
}
snappy := p.context.nsqd.options.snappyEnabled && identifyData.Snappy

if deflate && snappy {
return nil, util.NewFatalClientErr(nil, "E_IDENTIFY_FAILED", "cannot enable both deflate and snappy compression")
}

resp, err := json.Marshal(struct {
MaxRdyCount int64 `json:"max_rdy_count"`
Expand All @@ -332,6 +337,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
Deflate bool `json:"deflate"`
DeflateLevel int `json:"deflate_level"`
MaxDeflateLevel int `json:"max_deflate_level"`
Snappy bool `json:"snappy"`
}{
MaxRdyCount: p.context.nsqd.options.maxRdyCount,
Version: util.BINARY_VERSION,
Expand All @@ -341,6 +347,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
Deflate: deflate,
DeflateLevel: deflateLevel,
MaxDeflateLevel: p.context.nsqd.options.maxDeflateLevel,
Snappy: snappy,
})
if err != nil {
panic("should never happen")
Expand All @@ -364,6 +371,19 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
}
}

if snappy {
log.Printf("PROTOCOL(V2): [%s] upgrading connection to snappy", client)
err = client.UpgradeSnappy()
if err != nil {
return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error())
}

err = p.Send(client, nsq.FrameTypeResponse, okBytes)
if err != nil {
return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error())
}
}

if deflate {
log.Printf("PROTOCOL(V2): [%s] upgrading connection to deflate", client)
err = client.UpgradeDeflate(deflateLevel)
Expand Down
120 changes: 113 additions & 7 deletions nsqd/protocol_v2_test.go
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"github.com/bitly/go-nsq"
"github.com/bmizerany/assert"
"github.com/mreiferson/go-snappystream"
"io"
"io/ioutil"
"log"
"math"
Expand Down Expand Up @@ -40,7 +42,7 @@ func mustConnectNSQd(tcpAddr *net.TCPAddr) (net.Conn, error) {
return conn, nil
}

func identify(t *testing.T, conn net.Conn) {
func identify(t *testing.T, conn io.ReadWriter) {
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
Expand All @@ -50,7 +52,7 @@ func identify(t *testing.T, conn net.Conn) {
readValidate(t, conn, nsq.FrameTypeResponse, "OK")
}

func identifyHeartbeatInterval(t *testing.T, conn net.Conn, interval int, f int32, d string) {
func identifyHeartbeatInterval(t *testing.T, conn io.ReadWriter, interval int, f int32, d string) {
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
Expand All @@ -61,7 +63,7 @@ func identifyHeartbeatInterval(t *testing.T, conn net.Conn, interval int, f int3
readValidate(t, conn, f, d)
}

func identifyFeatureNegotiation(t *testing.T, conn net.Conn, extra map[string]interface{}) []byte {
func identifyFeatureNegotiation(t *testing.T, conn io.ReadWriter, extra map[string]interface{}) []byte {
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
Expand All @@ -82,7 +84,7 @@ func identifyFeatureNegotiation(t *testing.T, conn net.Conn, extra map[string]in
return data
}

func identifyOutputBuffering(t *testing.T, conn net.Conn, size int, timeout int, f int32, d string) {
func identifyOutputBuffering(t *testing.T, conn io.ReadWriter, size int, timeout int, f int32, d string) {
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
Expand All @@ -94,21 +96,21 @@ func identifyOutputBuffering(t *testing.T, conn net.Conn, size int, timeout int,
readValidate(t, conn, f, d)
}

func sub(t *testing.T, conn net.Conn, topicName string, channelName string) {
func sub(t *testing.T, conn io.ReadWriter, topicName string, channelName string) {
err := nsq.Subscribe(topicName, channelName).Write(conn)
assert.Equal(t, err, nil)
readValidate(t, conn, nsq.FrameTypeResponse, "OK")
}

func subFail(t *testing.T, conn net.Conn, topicName string, channelName string) {
func subFail(t *testing.T, conn io.ReadWriter, topicName string, channelName string) {
err := nsq.Subscribe(topicName, channelName).Write(conn)
assert.Equal(t, err, nil)
resp, err := nsq.ReadResponse(conn)
frameType, _, err := nsq.UnpackResponse(resp)
assert.Equal(t, frameType, nsq.FrameTypeError)
}

func readValidate(t *testing.T, conn net.Conn, f int32, d string) {
func readValidate(t *testing.T, conn io.Reader, f int32, d string) {
resp, err := nsq.ReadResponse(conn)
assert.Equal(t, err, nil)
frameType, data, err := nsq.UnpackResponse(resp)
Expand Down Expand Up @@ -794,6 +796,62 @@ func TestDeflate(t *testing.T) {
assert.Equal(t, data, []byte("OK"))
}

type readWriter struct {
io.Reader
io.Writer
}

func TestSnappy(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

*verbose = true
options := NewNsqdOptions()
options.snappyEnabled = true
tcpAddr, _, nsqd := mustStartNSQd(options)
defer nsqd.Exit()

conn, err := mustConnectNSQd(tcpAddr)
assert.Equal(t, err, nil)

data := identifyFeatureNegotiation(t, conn, map[string]interface{}{"snappy": true})
r := struct {
Snappy bool `json:"snappy"`
}{}
err = json.Unmarshal(data, &r)
assert.Equal(t, err, nil)
assert.Equal(t, r.Snappy, true)

compressConn := snappystream.NewReader(conn, snappystream.SkipVerifyChecksum)
resp, _ := nsq.ReadResponse(compressConn)
frameType, data, _ := nsq.UnpackResponse(resp)
log.Printf("frameType: %d, data: %s", frameType, data)
assert.Equal(t, frameType, nsq.FrameTypeResponse)
assert.Equal(t, data, []byte("OK"))

msgBody := make([]byte, 128000)
w := snappystream.NewWriter(conn)

rw := readWriter{compressConn, w}

topicName := "test_snappy" + strconv.Itoa(int(time.Now().Unix()))
sub(t, rw, topicName, "ch")

err = nsq.Ready(1).Write(rw)
assert.Equal(t, err, nil)

topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, msgBody)
topic.PutMessage(msg)

resp, _ = nsq.ReadResponse(compressConn)
frameType, data, _ = nsq.UnpackResponse(resp)
msgOut, _ := nsq.DecodeMessage(data)
assert.Equal(t, frameType, nsq.FrameTypeMessage)
assert.Equal(t, msgOut.Id, msg.Id)
assert.Equal(t, msgOut.Body, msg.Body)
}

func TestTLSDeflate(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
Expand Down Expand Up @@ -842,6 +900,54 @@ func TestTLSDeflate(t *testing.T) {
assert.Equal(t, data, []byte("OK"))
}

func TestTLSSnappy(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

*verbose = true
options := NewNsqdOptions()
options.snappyEnabled = true
options.tlsCert = "./test/cert.pem"
options.tlsKey = "./test/key.pem"
tcpAddr, _, nsqd := mustStartNSQd(options)
defer nsqd.Exit()

conn, err := mustConnectNSQd(tcpAddr)
assert.Equal(t, err, nil)

data := identifyFeatureNegotiation(t, conn, map[string]interface{}{"tls_v1": true, "snappy": true})
r := struct {
TLSv1 bool `json:"tls_v1"`
Snappy bool `json:"snappy"`
}{}
err = json.Unmarshal(data, &r)
assert.Equal(t, err, nil)
assert.Equal(t, r.TLSv1, true)
assert.Equal(t, r.Snappy, true)

tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
tlsConn := tls.Client(conn, tlsConfig)

err = tlsConn.Handshake()
assert.Equal(t, err, nil)

resp, _ := nsq.ReadResponse(tlsConn)
frameType, data, _ := nsq.UnpackResponse(resp)
log.Printf("frameType: %d, data: %s", frameType, data)
assert.Equal(t, frameType, nsq.FrameTypeResponse)
assert.Equal(t, data, []byte("OK"))

compressConn := snappystream.NewReader(tlsConn, snappystream.SkipVerifyChecksum)

resp, _ = nsq.ReadResponse(compressConn)
frameType, data, _ = nsq.UnpackResponse(resp)
log.Printf("frameType: %d, data: %s", frameType, data)
assert.Equal(t, frameType, nsq.FrameTypeResponse)
assert.Equal(t, data, []byte("OK"))
}

func BenchmarkProtocolV2Exec(b *testing.B) {
b.StopTimer()
log.SetOutput(ioutil.Discard)
Expand Down
5 changes: 3 additions & 2 deletions test.sh
Expand Up @@ -21,8 +21,9 @@ popd >/dev/null
pushd nsqd >/dev/null
go build
rm -f *.dat
echo "starting nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=./test/cert.pem --tls-key=./test/key.pem --deflate"
./nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=./test/cert.pem --tls-key=./test/key.pem --deflate >/dev/null 2>&1 &
cmd="./nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=./test/cert.pem --tls-key=./test/key.pem"
echo "starting $cmd"
$cmd >/dev/null 2>&1 &
NSQD_PID=$!
popd >/dev/null

Expand Down

0 comments on commit d3e525e

Please sign in to comment.