Skip to content

Commit

Permalink
protocol: add find coordinator request/response
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Mar 24, 2018
1 parent c98abb7 commit f5d7996
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 6 deletions.
34 changes: 34 additions & 0 deletions protocol/find_coordinator_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package protocol

type FindCoordinatorRequest struct {
CoordinatorKey string
CoordinatorType int8
}

func (r *FindCoordinatorRequest) Encode(e PacketEncoder) (err error) {
if err = e.PutString(r.CoordinatorKey); err != nil {
return err
}
if r.Version() >= 1 {
e.PutInt8(r.CoordinatorType)
}
return nil
}

func (r *FindCoordinatorRequest) Decode(d PacketDecoder) (err error) {
if r.CoordinatorKey, err = d.String(); err != nil {
return err
}
if r.CoordinatorType, err = d.Int8(); err != nil {
return err
}
return nil
}

func (r *FindCoordinatorRequest) Version() int16 {
return 1
}

func (r *FindCoordinatorRequest) Key() int16 {
return 10
}
22 changes: 22 additions & 0 deletions protocol/find_coordinator_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package protocol

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFindCoordinatorRequest(t *testing.T) {
req := require.New(t)
exp := &FindCoordinatorRequest{
CoordinatorKey: "coord-key",
CoordinatorType: 1,
}
b, err := Encode(exp)
req.NoError(err)
var act FindCoordinatorRequest
err = Decode(b, &act)
req.NoError(err)
req.Equal(exp, &act)

}
50 changes: 50 additions & 0 deletions protocol/find_coordinator_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package protocol

type Coordinator struct {
NodeID int32
Host string
Port int32
}

type FindCoordinatorResponse struct {
ThrottleTimeMs int32
ErrorCode int16
ErrorMessage *string
Coordinator Coordinator
}

func (r *FindCoordinatorResponse) Encode(e PacketEncoder) (err error) {
e.PutInt32(r.ThrottleTimeMs)
e.PutInt16(r.ErrorCode)
if err = e.PutNullableString(r.ErrorMessage); err != nil {
return err
}
e.PutInt32(r.Coordinator.NodeID)
if err = e.PutString(r.Coordinator.Host); err != nil {
return err
}
e.PutInt32(r.Coordinator.Port)
return nil
}

func (r *FindCoordinatorResponse) Decode(d PacketDecoder) (err error) {
if r.ThrottleTimeMs, err = d.Int32(); err != nil {
return err
}
if r.ErrorCode, err = d.Int16(); err != nil {
return err
}
if r.ErrorMessage, err = d.NullableString(); err != nil {
return err
}
if r.Coordinator.NodeID, err = d.Int32(); err != nil {
return err
}
if r.Coordinator.Host, err = d.String(); err != nil {
return err
}
if r.Coordinator.Port, err = d.Int32(); err != nil {
return err
}
return nil
}
28 changes: 28 additions & 0 deletions protocol/find_coordinator_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package protocol

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFindCoordinatorResponse(t *testing.T) {
req := require.New(t)
errMsg := "Shit's broken"
exp := &FindCoordinatorResponse{
ThrottleTimeMs: 1,
ErrorCode: 2,
ErrorMessage: &errMsg,
Coordinator: Coordinator{
NodeID: 3,
Host: "localhost",
Port: 4,
},
}
b, err := Encode(exp)
req.NoError(err)
var act FindCoordinatorResponse
err = Decode(b, &act)
req.NoError(err)
req.Equal(exp, &act)
}
6 changes: 0 additions & 6 deletions protocol/group_coordinator_response.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package protocol

type Coordinator struct {
NodeID int32
Host string
Port int32
}

type GroupCoordinatorResponse struct {
ErrorCode int16
Coordinator *Coordinator
Expand Down

0 comments on commit f5d7996

Please sign in to comment.