Skip to content

Commit

Permalink
move protocol into its own pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Nov 5, 2016
1 parent 08da003 commit cb07073
Show file tree
Hide file tree
Showing 8 changed files with 537 additions and 186 deletions.
8 changes: 8 additions & 0 deletions protocol/body.go
@@ -0,0 +1,8 @@
package protocol

type Body interface {
Decode(d Decoder)
Encode(e Encoder)
Key() int16
Version() int16
}
139 changes: 139 additions & 0 deletions protocol/create_topic_requests.go
@@ -0,0 +1,139 @@
package protocol

type CreateTopicRequest struct {
Topic string
NumPartitions int32
ReplicationFactor int16
ReplicaAssignment map[int32][]int32
Configs map[string]string
}

type CreateTopicRequests struct {
Requests []*CreateTopicRequest
Timeout int32
}

func (c *CreateTopicRequests) Decode(d Decoder) error {
var err error
reqslen, err := d.ArrayLength()
if err != nil {
return err
}
c.Requests = make([]*CreateTopicRequest, reqslen)
for i := range c.Requests {
req := new(CreateTopicRequest)
c.Requests[i] = req

req.Topic, err = d.String()
if err != nil {
return err
}
req.NumPartitions, err = d.Int32()
if err != nil {
return err
}
req.ReplicationFactor, err = d.Int16()
if err != nil {
return err
}
ralen, err := d.ArrayLength()
ra := make(map[int32][]int32, ralen)
for i := 0; i < ralen; i++ {
pid, err := d.Int32()
if err != nil {
return err
}
replen, err := d.ArrayLength()
if err != nil {
return err
}
reps := make([]int32, replen)
for i := range reps {
reps[i], err = d.Int32()
if err != nil {
return err
}
}
ra[pid] = reps
}
req.ReplicaAssignment = ra

clen, err := d.ArrayLength()
if err != nil {
return err
}
c := make(map[string]string, clen)
for j := 0; j < clen; j++ {
k, err := d.String()
if err != nil {
return err
}
v, err := d.String()
if err != nil {
return err
}
c[k] = v
}
req.Configs = c
}

return nil
}

func (c *CreateTopicRequests) encode(e Encoder) {
e.PutArrayLength(len(c.Requests))
for _, r := range c.Requests {
e.PutString(r.Topic)
e.PutInt32(r.NumPartitions)
e.PutInt16(r.ReplicationFactor)
e.PutArrayLength(len(r.ReplicaAssignment))
for pid, ass := range r.ReplicaAssignment {
e.PutInt32(pid)
for _, a := range ass {
e.PutInt32(a)
}
}
e.PutArrayLength(len(r.Configs))
for k, v := range r.Configs {
e.PutString(k)
e.PutString(v)
}
}
}

func (c *CreateTopicRequests) key() int16 {
return 19
}

func (c *CreateTopicRequests) version() int16 {
return 0
}

func (c *CreateTopicRequests) Encode() ([]byte, error) {
rh := &RequestHeader{
APIKey: 19,
APIVersion: 0,
CorrelationID: CorrelationID(),
ClientID: ClientID(),
}
le := &LenEncoder{}
rh.Encode(le)
rhlen := le.Length
b := make([]byte, rhlen)
e := NewByteEncoder(b)
rh.Encode(e)
rhb := e.Bytes()

le = &LenEncoder{}
c.encode(le)
l := le.Length + rhlen
b = make([]byte, l)

e = NewByteEncoder(b)
e.PutRawBytes(rhb)
c.encode(e)

Encoding.PutUint32(b, uint32(l))

return b, nil
}
63 changes: 40 additions & 23 deletions encoding/decoder.go → protocol/decoder.go
@@ -1,4 +1,4 @@
package encoding
package protocol

import (
"errors"
Expand All @@ -10,49 +10,66 @@ var ErrInvalidStringLength = errors.New("kafka: invalid string length")
var ErrInvalidArrayLength = errors.New("kafka: invalid array length")
var ErrInvalidByteSliceLength = errors.New("invalid byteslice length")

type Decoder struct {
type Decoder interface {
Int16() (int16, error)
Int32() (int32, error)
Int64() (int64, error)
ArrayLength() (int, error)
Bytes() ([]byte, error)
String() (string, error)
Int32Array() ([]int32, error)
Int64Array() ([]int64, error)
StringArray() ([]string, error)
remaining() int
}

type ByteDecoder struct {
b []byte
off int
}

func NewDecoder(b []byte) *Decoder {
return &Decoder{
func (d *ByteDecoder) Offset() int {
return d.off
}

func NewDecoder(b []byte) *ByteDecoder {
return &ByteDecoder{
b: b,
}
}

func (d *Decoder) Int16() (int16, error) {
tmp := int16(Enc.Uint16(d.b[d.off:]))
func (d *ByteDecoder) Int16() (int16, error) {
tmp := int16(Encoding.Uint16(d.b[d.off:]))
d.off += 2
return tmp, nil
}

func (d *Decoder) Int32() (int32, error) {
func (d *ByteDecoder) Int32() (int32, error) {
if d.remaining() < 4 {
d.off = len(d.b)
return -1, ErrInsufficientData
}
tmp := int32(Enc.Uint32(d.b[d.off:]))
tmp := int32(Encoding.Uint32(d.b[d.off:]))
d.off += 4
return tmp, nil
}

func (d *Decoder) Int64() (int64, error) {
func (d *ByteDecoder) Int64() (int64, error) {
if d.remaining() < 8 {
d.off = len(d.b)
return -1, ErrInsufficientData
}
tmp := int64(Enc.Uint64(d.b[d.off:]))
tmp := int64(Encoding.Uint64(d.b[d.off:]))
d.off += 8
return tmp, nil
}

func (d *Decoder) ArrayLength() (int, error) {
func (d *ByteDecoder) ArrayLength() (int, error) {
if d.remaining() < 4 {
d.off = len(d.b)
return -1, ErrInsufficientData
}
tmp := int(Enc.Uint32(d.b[d.off:]))
tmp := int(Encoding.Uint32(d.b[d.off:]))
d.off += 4
if tmp > d.remaining() {
d.off = len(d.b)
Expand All @@ -65,7 +82,7 @@ func (d *Decoder) ArrayLength() (int, error) {

// collections

func (d *Decoder) Bytes() ([]byte, error) {
func (d *ByteDecoder) Bytes() ([]byte, error) {
tmp, err := d.Int32()

if err != nil {
Expand All @@ -91,7 +108,7 @@ func (d *Decoder) Bytes() ([]byte, error) {
return tmpStr, nil
}

func (d *Decoder) String() (string, error) {
func (d *ByteDecoder) String() (string, error) {
tmp, err := d.Int16()

if err != nil {
Expand All @@ -117,12 +134,12 @@ func (d *Decoder) String() (string, error) {
return tmpStr, nil
}

func (d *Decoder) Int32Array() ([]int32, error) {
func (d *ByteDecoder) Int32Array() ([]int32, error) {
if d.remaining() < 4 {
d.off = len(d.b)
return nil, ErrInsufficientData
}
n := int(Enc.Uint32(d.b[d.off:]))
n := int(Encoding.Uint32(d.b[d.off:]))
d.off += 4

if d.remaining() < 4*n {
Expand All @@ -140,18 +157,18 @@ func (d *Decoder) Int32Array() ([]int32, error) {

ret := make([]int32, n)
for i := range ret {
ret[i] = int32(Enc.Uint32(d.b[d.off:]))
ret[i] = int32(Encoding.Uint32(d.b[d.off:]))
d.off += 4
}
return ret, nil
}

func (d *Decoder) Int64Array() ([]int64, error) {
func (d *ByteDecoder) Int64Array() ([]int64, error) {
if d.remaining() < 4 {
d.off = len(d.b)
return nil, ErrInsufficientData
}
n := int(Enc.Uint32(d.b[d.off:]))
n := int(Encoding.Uint32(d.b[d.off:]))
d.off += 4

if d.remaining() < 8*n {
Expand All @@ -169,18 +186,18 @@ func (d *Decoder) Int64Array() ([]int64, error) {

ret := make([]int64, n)
for i := range ret {
ret[i] = int64(Enc.Uint64(d.b[d.off:]))
ret[i] = int64(Encoding.Uint64(d.b[d.off:]))
d.off += 8
}
return ret, nil
}

func (d *Decoder) StringArray() ([]string, error) {
func (d *ByteDecoder) StringArray() ([]string, error) {
if d.remaining() < 4 {
d.off = len(d.b)
return nil, ErrInsufficientData
}
n := int(Enc.Uint32(d.b[d.off:]))
n := int(Encoding.Uint32(d.b[d.off:]))
d.off += 4

if n == 0 {
Expand All @@ -202,6 +219,6 @@ func (d *Decoder) StringArray() ([]string, error) {
return ret, nil
}

func (d *Decoder) remaining() int {
func (d *ByteDecoder) remaining() int {
return len(d.b) - d.off
}

0 comments on commit cb07073

Please sign in to comment.