Skip to content

Commit

Permalink
Add ztsd compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Pryz committed Apr 9, 2019
1 parent afec9cd commit fcafd11
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Expand Up @@ -121,7 +121,7 @@ jobs:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
- run: go get -v -t . ./gzip ./lz4 ./sasl ./snappy
- run: go test -v -race -cover -timeout 150s . ./gzip ./lz4 ./sasl ./snappy
- run: go test -v -race -cover -timeout 150s $(go list ./... | grep -v examples)

workflows:
version: 2
Expand Down
3 changes: 1 addition & 2 deletions compression.go
Expand Up @@ -47,6 +47,5 @@ type CompressionCodec interface {
Decode(src []byte) ([]byte, error)
}

const compressionCodecMask int8 = 0x03
const DefaultCompressionLevel int = -1
const compressionCodecMask int8 = 0x07
const CompressionNoneCode = 0
16 changes: 16 additions & 0 deletions compression_test.go
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/segmentio/kafka-go/gzip"
"github.com/segmentio/kafka-go/lz4"
"github.com/segmentio/kafka-go/snappy"
ktesting "github.com/segmentio/kafka-go/testing"
"github.com/segmentio/kafka-go/zstd"
)

func TestCompression(t *testing.T) {
Expand All @@ -22,6 +24,9 @@ func TestCompression(t *testing.T) {
testEncodeDecode(t, msg, gzip.NewCompressionCodec())
testEncodeDecode(t, msg, snappy.NewCompressionCodec())
testEncodeDecode(t, msg, lz4.NewCompressionCodec())
if ktesting.KafkaIsAtLeast("2.1.0") {
testEncodeDecode(t, msg, zstd.NewCompressionCodec())
}
}

func testEncodeDecode(t *testing.T, m kafka.Message, codec kafka.CompressionCodec) {
Expand Down Expand Up @@ -63,6 +68,8 @@ func codecToStr(codec int8) string {
return "snappy"
case lz4.Code:
return "lz4"
case zstd.Code:
return "zstd"
default:
return "unknown"
}
Expand All @@ -72,6 +79,10 @@ func TestCompressedMessages(t *testing.T) {
testCompressedMessages(t, gzip.NewCompressionCodec())
testCompressedMessages(t, snappy.NewCompressionCodec())
testCompressedMessages(t, lz4.NewCompressionCodec())

if ktesting.KafkaIsAtLeast("2.1.0") {
testCompressedMessages(t, zstd.NewCompressionCodec())
}
}

func testCompressedMessages(t *testing.T, codec kafka.CompressionCodec) {
Expand Down Expand Up @@ -255,6 +266,11 @@ func BenchmarkCompression(b *testing.B) {
codec: lz4.NewCompressionCodec(),
function: benchmarkCompression,
},
{
scenario: "zstd",
codec: zstd.NewCompressionCodec(),
function: benchmarkCompression,
},
}

payload := map[int][]byte{
Expand Down
110 changes: 83 additions & 27 deletions conn.go
Expand Up @@ -70,9 +70,10 @@ type Conn struct {
correlationID int32

// number of replica acks required when publishing to a partition
requiredAcks int32
apiVersions map[apiKey]ApiVersion
fetchVersion apiVersion
requiredAcks int32
apiVersions map[apiKey]ApiVersion
fetchVersion apiVersion
produceVersion apiVersion

transactionalID *string
}
Expand Down Expand Up @@ -188,12 +189,22 @@ func (c *Conn) selectVersions() {
}
for _, v := range c.apiVersions {
if apiKey(v.ApiKey) == fetchRequest {
if v.MaxVersion >= 5 {
switch version := v.MaxVersion; {
case version >= 10:
c.fetchVersion = 10
case version >= 5:
c.fetchVersion = 5
} else {
default:
c.fetchVersion = 2
}
}
if apiKey(v.ApiKey) == produceRequest {
if v.MaxVersion >= 7 {
c.produceVersion = 7
} else {
c.produceVersion = 2
}
}
}
}

Expand Down Expand Up @@ -725,6 +736,19 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
adjustedDeadline = deadline
switch c.fetchVersion {
case v10:
return writeFetchRequestV10(
&c.wbuf,
id,
c.clientID,
c.topic,
c.partition,
offset,
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
int8(cfg.IsolationLevel),
)
case v5:
return writeFetchRequestV5(
&c.wbuf,
Expand Down Expand Up @@ -766,6 +790,8 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
var remain int

switch c.fetchVersion {
case v10:
throttle, highWaterMark, remain, err = readFetchResponseHeaderV10(&c.rbuf, size)
case v5:
throttle, highWaterMark, remain, err = readFetchResponseHeaderV5(&c.rbuf, size)
default:
Expand Down Expand Up @@ -996,7 +1022,21 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
func(deadline time.Time, id int32) error {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
if c.apiVersions[produceRequest].MaxVersion >= 3 {
switch version := c.apiVersions[produceRequest].MaxVersion; {
case version >= 7:
return writeProduceRequestV7(
&c.wbuf,
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
c.transactionalID,
msgs...,
)
case version >= 3:
return writeProduceRequestV3(
&c.wbuf,
codec,
Expand All @@ -1009,18 +1049,19 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
c.transactionalID,
msgs...,
)
default:
return writeProduceRequestV2(
&c.wbuf,
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
msgs...,
)
}
return writeProduceRequestV2(
&c.wbuf,
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
msgs...,
)
},
func(deadline time.Time, size int) error {
return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
Expand All @@ -1034,18 +1075,33 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
// Read the list of partitions, there should be only one since
// we've produced a message to a single partition.
size, err = readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
var p produceResponsePartitionV2
size, err := p.readFrom(r, size)
if err == nil && p.ErrorCode != 0 {
err = Error(p.ErrorCode)
}
if err == nil {
partition = p.Partition
offset = p.Offset
appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
switch c.produceVersion {
case v7:
var p produceResponsePartitionV7
size, err := p.readFrom(r, size)
if err == nil && p.ErrorCode != 0 {
err = Error(p.ErrorCode)
}
if err == nil {
partition = p.Partition
offset = p.Offset
appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
}
return size, err
default:
var p produceResponsePartitionV2
size, err := p.readFrom(r, size)
if err == nil && p.ErrorCode != 0 {
err = Error(p.ErrorCode)
}
if err == nil {
partition = p.Partition
offset = p.Offset
appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
}
return size, err
}

return size, err
})
if err != nil {
return size, err
Expand Down
7 changes: 5 additions & 2 deletions gzip/gzip.go
Expand Up @@ -47,10 +47,13 @@ type CompressionCodec struct {
CompressionLevel int
}

const Code = 1
const (
Code int8 = 1
DefaultCompressionLevel int = -1
)

func NewCompressionCodec() CompressionCodec {
return NewCompressionCodecWith(kafka.DefaultCompressionLevel)
return NewCompressionCodecWith(DefaultCompressionLevel)
}

func NewCompressionCodecWith(level int) CompressionCodec {
Expand Down
39 changes: 39 additions & 0 deletions produce.go
Expand Up @@ -111,3 +111,42 @@ func (p *produceResponsePartitionV2) readFrom(r *bufio.Reader, sz int) (remain i
}
return
}

type produceResponsePartitionV7 struct {
Partition int32
ErrorCode int16
Offset int64
Timestamp int64
StartOffset int64
}

func (p produceResponsePartitionV7) size() int32 {
return 4 + 2 + 8 + 8 + 8
}

func (p produceResponsePartitionV7) writeTo(w *bufio.Writer) {
writeInt32(w, p.Partition)
writeInt16(w, p.ErrorCode)
writeInt64(w, p.Offset)
writeInt64(w, p.Timestamp)
writeInt64(w, p.StartOffset)
}

func (p *produceResponsePartitionV7) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
if remain, err = readInt32(r, sz, &p.Partition); err != nil {
return
}
if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Offset); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.StartOffset); err != nil {
return
}
return
}
12 changes: 7 additions & 5 deletions protocol.go
Expand Up @@ -32,11 +32,13 @@ const (
type apiVersion int16

const (
v0 apiVersion = 0
v1 apiVersion = 1
v2 apiVersion = 2
v3 apiVersion = 3
v5 apiVersion = 5
v0 apiVersion = 0
v1 apiVersion = 1
v2 apiVersion = 2
v3 apiVersion = 3
v5 apiVersion = 5
v7 apiVersion = 7
v10 apiVersion = 10
)

type requestHeader struct {
Expand Down

0 comments on commit fcafd11

Please sign in to comment.