Skip to content

Commit

Permalink
fixes for #68
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 25, 2017
1 parent 29e8f02 commit 6553b9a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 57 deletions.
5 changes: 2 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@ store.db
vendor/*
!vendor/vendor.json
.idea
broker/**/*.log
broker/**/*.index

*.log
*.index
7 changes: 6 additions & 1 deletion broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

var (
ErrTopicExists = errors.New("topic exists already")
ErrTopicExists = errors.New("topic exists already")
ErrInvalidArgument = errors.New("no logger set")
)

// Broker represents a broker in a Jocko cluster, like a broker in a Kafka cluster.
Expand Down Expand Up @@ -53,6 +54,10 @@ func New(id int32, opts ...BrokerFn) (*Broker, error) {
o(b)
}

if b.logger == nil {
return nil, ErrInvalidArgument
}

port, err := addrPort(b.brokerAddr)
if err != nil {
return nil, err
Expand Down
107 changes: 54 additions & 53 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,32 @@ import (

func TestNew(t *testing.T) {
tests := []struct {
name string
fields fields
alterFields func(f *fields)
want *Broker
wantErr bool
name string
fields fields
setFields func(f *fields)
wantErr bool
}{
{
name: "new broker ok",
fields: newFields(),
name: "broker ok",
},
{
name: "no logger error",
wantErr: true,
setFields: func(f *fields) {
f.logger = nil
},
},
{
name: "new broker port error",
name: "no broker addr error",
wantErr: true,
alterFields: func(f *fields) {
setFields: func(f *fields) {
f.brokerAddr = ""
},
},
{
name: "new broker raft port error",
name: "no raft addr error",
wantErr: true,
alterFields: func(f *fields) {
setFields: func(f *fields) {
f.raft = &mock.Raft{
AddrFn: func() string {
return ""
Expand All @@ -45,11 +50,9 @@ func TestNew(t *testing.T) {
},
},
{
// TODO: Possible bug. If a logger is not set with logger option,
// line will fail if serf.Bootstrap returns error
name: "new broker serf bootstrap error",
name: "serf bootstrap error",
wantErr: true,
alterFields: func(f *fields) {
setFields: func(f *fields) {
f.serf = &mock.Serf{
BootstrapFn: func(n *jocko.ClusterMember, rCh chan<- *jocko.ClusterMember) error {
return errors.New("mock serf bootstrap error")
Expand All @@ -58,9 +61,9 @@ func TestNew(t *testing.T) {
},
},
{
name: "new broker raft bootstrap error",
name: "raft bootstrap error",
wantErr: true,
alterFields: func(f *fields) {
setFields: func(f *fields) {
f.raft = &mock.Raft{
AddrFn: f.raft.AddrFn,
BootstrapFn: func(s jocko.Serf, sCh <-chan *jocko.ClusterMember, cCh chan<- jocko.RaftCommand) error {
Expand All @@ -73,10 +76,10 @@ func TestNew(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.fields = newFields()
if tt.alterFields != nil {
tt.alterFields(&tt.fields)
if tt.setFields != nil {
tt.setFields(&tt.fields)
}
tt.want = &Broker{
want := &Broker{
logger: tt.fields.logger,
id: tt.fields.id,
topicMap: tt.fields.topicMap,
Expand Down Expand Up @@ -106,10 +109,10 @@ func TestNew(t *testing.T) {
if got != nil && got.shutdownCh == nil {
t.Errorf("got.shutdownCh is nil")
} else if got != nil {
tt.want.shutdownCh = got.shutdownCh
want.shutdownCh = got.shutdownCh
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("New() = %v, want %v", got, tt.want)
if !reflect.DeepEqual(got, want) {
t.Errorf("New() = %v, want %v", got, want)
}
})
}
Expand Down Expand Up @@ -177,22 +180,22 @@ func TestBroker_Join(t *testing.T) {
}
err := errors.New("mock serf join error")
tests := []struct {
name string
fields fields
alterFields func(f *fields)
args args
want protocol.Error
name string
fields fields
setFields func(f *fields)
args args
want protocol.Error
}{
{
name: "joins with serf",
name: "ok",
fields: newFields(),
args: args{addrs: []string{"localhost:9082"}},
want: protocol.ErrNone,
},
{
name: "join with serf error",
name: "serf errr",
fields: newFields(),
alterFields: func(f *fields) {
setFields: func(f *fields) {
f.serf.JoinFn = func(addrs ...string) (int, error) {
return -1, err
}
Expand All @@ -203,8 +206,8 @@ func TestBroker_Join(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.alterFields != nil {
tt.alterFields(&tt.fields)
if tt.setFields != nil {
tt.setFields(&tt.fields)
}
b := &Broker{
logger: tt.fields.logger,
Expand Down Expand Up @@ -657,11 +660,10 @@ func TestBroker_startReplica(t *testing.T) {
Leader: 1,
}
tests := []struct {
name string
fields fields
alterFields func(f *fields)
args args
want protocol.Error
name string
setFields func(f *fields)
args args
want protocol.Error
}{
{
name: "started replica as leader",
Expand All @@ -684,7 +686,7 @@ func TestBroker_startReplica(t *testing.T) {
},
{
name: "started replica with existing topic",
alterFields: func(f *fields) {
setFields: func(f *fields) {
f.topicMap["existing-topic"] = []*jocko.Partition{
{
ID: 1,
Expand All @@ -709,7 +711,7 @@ func TestBroker_startReplica(t *testing.T) {
// },
{
name: "started replica with commitlog error",
alterFields: func(f *fields) {
setFields: func(f *fields) {
f.logDir = ""
},
args: args{
Expand All @@ -720,22 +722,21 @@ func TestBroker_startReplica(t *testing.T) {
}
for _, tt := range tests {
fields := newFields()
if tt.alterFields != nil {
tt.alterFields(&fields)
if tt.setFields != nil {
tt.setFields(&fields)
}
tt.fields = fields
t.Run(tt.name, func(t *testing.T) {
b := &Broker{
logger: tt.fields.logger,
id: tt.fields.id,
topicMap: tt.fields.topicMap,
replicators: tt.fields.replicators,
brokerAddr: tt.fields.brokerAddr,
logDir: tt.fields.logDir,
raft: tt.fields.raft,
serf: tt.fields.serf,
shutdownCh: tt.fields.shutdownCh,
shutdown: tt.fields.shutdown,
logger: fields.logger,
id: fields.id,
topicMap: fields.topicMap,
replicators: fields.replicators,
brokerAddr: fields.brokerAddr,
logDir: fields.logDir,
raft: fields.raft,
serf: fields.serf,
shutdownCh: fields.shutdownCh,
shutdown: fields.shutdown,
}
if got := b.startReplica(tt.args.partition); got.Error() != tt.want.Error() {
t.Errorf("Broker.startReplica() = %v, want %v", got, tt.want)
Expand Down

0 comments on commit 6553b9a

Please sign in to comment.