diff --git a/broker/broker.go b/broker/broker.go index 35f70e17..e4fa9c8b 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -3,11 +3,11 @@ package broker import ( "encoding/json" "fmt" - "io" "net" "os" "path" "path/filepath" + "strconv" "sync" "time" @@ -63,29 +63,31 @@ type Broker struct { id int32 host string - port string + port int topics map[string][]*jocko.Partition peers map[int32]*jocko.BrokerConn peerLock sync.Mutex - dataDir string - raftAddr string - tcpAddr string - logDir string + dataDir string + bindAddr string + logDir string + devDisableBootstrap bool raft *raft.Raft + raftPort int raftPeers raft.PeerStore raftTransport *raft.NetworkTransport raftStore *raftboltdb.BoltStore raftLeaderCh chan bool + raftConfig *raft.Config - serf *serf.Serf - serfReconcileCh chan serf.Member - serfEventCh chan serf.Event - - reconcileInterval time.Duration - reconcileCh chan serf.Member + serf *serf.Serf + serfPort int + serfAddr string + serfReconcileCh chan serf.Member + serfReconcileInterval time.Duration + serfEventCh chan serf.Event left bool shutdownCh chan struct{} @@ -101,28 +103,23 @@ const ( func New(id int32, opts ...Option) (*Broker, error) { var err error b := &Broker{ - replicationManager: newReplicationManager(), - peers: make(map[int32]*jocko.BrokerConn), - id: id, - topics: make(map[string][]*jocko.Partition), - serfReconcileCh: make(chan serf.Member, 32), - serfEventCh: make(chan serf.Event, 256), - reconcileInterval: time.Second * 60, - shutdownCh: make(chan struct{}), + serfPort: 7946, + serfAddr: "0.0.0.0", + raftConfig: raft.DefaultConfig(), + replicationManager: newReplicationManager(), + peers: make(map[int32]*jocko.BrokerConn), + id: id, + topics: make(map[string][]*jocko.Partition), + serfReconcileCh: make(chan serf.Member, 32), + serfEventCh: make(chan serf.Event, 256), + serfReconcileInterval: time.Second * 5, + shutdownCh: make(chan struct{}), } for _, o := range opts { o.modifyBroker(b) } - host, port, err := net.SplitHostPort(b.tcpAddr) - if err != nil { - return nil, err - } - - b.host = host - b.port = port - serfConfig := serf.DefaultConfig() b.serf, err = b.setupSerf(serfConfig, b.serfEventCh, serfSnapshot) if err != nil { @@ -135,19 +132,20 @@ func New(id int32, opts ...Option) (*Broker, error) { return nil, err } + // monitor leadership changes + go b.monitorLeadership() + + // ingest events for serf + go b.serfEventHandler() + return b, nil } -func (b *Broker) setupRaft() error { - conf := raft.DefaultConfig() - - addr, err := net.ResolveTCPAddr("tcp", b.raftAddr) - if err != nil { - return errors.Wrap(err, "resolve bind addr failed") - } +func (b *Broker) setupRaft() (err error) { + addr := &net.TCPAddr{IP: net.ParseIP(b.bindAddr), Port: b.raftPort} if b.raftTransport == nil { - b.raftTransport, err = raft.NewTCPTransport(b.raftAddr, addr, 3, timeout, os.Stderr) + b.raftTransport, err = raft.NewTCPTransport(addr.String(), nil, 3, timeout, os.Stderr) if err != nil { return errors.Wrap(err, "tcp transport failed") } @@ -160,6 +158,15 @@ func (b *Broker) setupRaft() error { b.raftPeers = raft.NewJSONPeers(path, b.raftTransport) + var peers []string + for _, p := range b.peers { + addr := &net.TCPAddr{IP: net.ParseIP(p.IP), Port: p.RaftPort} + peers = append(peers, addr.String()) + } + if err = b.raftPeers.SetPeers(peers); err != nil { + return err + } + snapshots, err := raft.NewFileSnapshotStore(path, 2, os.Stderr) if err != nil { return err @@ -173,9 +180,10 @@ func (b *Broker) setupRaft() error { leaderCh := make(chan bool, 1) b.raftLeaderCh = leaderCh - conf.NotifyCh = leaderCh + b.raftConfig.NotifyCh = leaderCh + b.raftConfig.StartAsLeader = !b.devDisableBootstrap - raft, err := raft.NewRaft(conf, b, boltStore, boltStore, snapshots, b.raftPeers, b.raftTransport) + raft, err := raft.NewRaft(b.raftConfig, b, boltStore, boltStore, snapshots, b.raftPeers, b.raftTransport) if err != nil { if b.raftStore != nil { b.raftStore.Close() @@ -185,21 +193,33 @@ func (b *Broker) setupRaft() error { } b.raft = raft - // monitor leadership changes - go b.monitorLeadership() - return nil } func (b *Broker) setupSerf(conf *serf.Config, eventCh chan serf.Event, serfSnapshot string) (*serf.Serf, error) { conf.Init() - id := fmt.Sprintf("%d", b.id) + id := fmt.Sprintf("jocko-%03d", b.id) + conf.MemberlistConfig.BindAddr = b.serfAddr + conf.MemberlistConfig.BindPort = b.serfPort conf.NodeName = id - conf.Tags["id"] = id - conf.Tags["port"] = b.port + conf.Tags["id"] = strconv.Itoa(int(b.id)) + conf.Tags["port"] = strconv.Itoa(b.port) + conf.Tags["raft_port"] = strconv.Itoa(b.raftPort) conf.EventCh = eventCh conf.EnableNameConflictResolution = false - return serf.Create(conf) + s, err := serf.Create(conf) + if err != nil { + return nil, err + } + if len(b.peers) > 0 { + var addrs []string + for _, p := range b.peers { + addr := &net.TCPAddr{IP: net.ParseIP(p.IP), Port: p.SerfPort} + addrs = append(addrs, addr.String()) + } + s.Join(addrs, true) + } + return s, nil } func (b *Broker) ID() int32 { @@ -210,7 +230,7 @@ func (b *Broker) Host() string { return b.host } -func (b *Broker) Port() string { +func (b *Broker) Port() int { return b.port } @@ -320,14 +340,21 @@ func (s *Broker) BrokerConn(id int32) *jocko.BrokerConn { } func (s *Broker) addPartition(partition *jocko.Partition) { - s.mu.RLock() + s.mu.Lock() if v, ok := s.topics[partition.Topic]; ok { s.topics[partition.Topic] = append(v, partition) } else { s.topics[partition.Topic] = []*jocko.Partition{partition} } - s.mu.RUnlock() - if s.IsLeaderOfPartition(partition.Topic, partition.ID, partition.LeaderID()) { + s.mu.Unlock() + isLeader := partition.Leader == s.id + isFollower := false + for _, r := range partition.Replicas { + if r == s.id { + isFollower = true + } + } + if isLeader || isFollower { commitLog, err := commitlog.New(commitlog.Options{ Path: path.Join(s.logDir, partition.String()), MaxSegmentBytes: 1024, @@ -343,28 +370,29 @@ func (s *Broker) addPartition(partition *jocko.Partition) { panic(err) } partition.CommitLog = commitLog + + partition.Conn = s.peers[partition.LeaderID()] } } func (s *Broker) addBroker(broker *jocko.BrokerConn) { - s.mu.Lock() - defer s.mu.Unlock() // TODO: remove this - s.peers = append(s.peers, broker) + s.peerLock.Lock() + s.peers[broker.ID] = broker + s.peerLock.Unlock() } func (s *Broker) IsLeaderOfPartition(topic string, pid int32, lid int32) bool { s.mu.RLock() defer s.mu.RUnlock() + result := false for _, p := range s.topics[topic] { if p.ID == pid { - if lid == s.id { - return true - } + result = lid == p.LeaderID() break } } - return false + return result } func (s *Broker) Topics() []string { @@ -377,7 +405,7 @@ func (s *Broker) Topics() []string { // Join is used to have the broker join the gossip ring // The target address should be another broker listening on the Serf address -func (s *Broker) Join(id int32, addrs ...string) (int, error) { +func (s *Broker) Join(addrs ...string) (int, error) { return s.serf.Join(addrs, true) } @@ -431,15 +459,13 @@ func (s *Broker) CreateTopic(topic string, partitions int32) error { return ErrTopicExists } } - brokers := s.peers for i := int32(0); i < partitions; i++ { - broker := brokers[i%int32(len(brokers))] partition := &jocko.Partition{ Topic: topic, ID: i, - Leader: broker, - PreferredLeader: broker, - Replicas: []*jocko.BrokerConn{broker}, + Leader: i, + PreferredLeader: i, + Replicas: []int32{i}, } if err := s.AddPartition(partition); err != nil { return err @@ -478,55 +504,19 @@ func (s *Broker) deleteTopic(tp *jocko.Partition) error { return nil } -func (s *Broker) Restore(rc io.ReadCloser) error { - return nil -} +// Leave is used to prepare for a graceful shutdown of the server +func (b *Broker) Leave() error { + b.logger.Info("broker starting to leave") + b.left = true -type FSMSnapshot struct { -} + // TODO: handle case if we're the controller/leader -func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) error { - return nil -} - -func (f *FSMSnapshot) Release() {} - -func (s *Broker) Snapshot() (raft.FSMSnapshot, error) { - return &FSMSnapshot{}, nil -} - -func (s *Broker) WaitForLeader(timeout time.Duration) (string, error) { - tick := time.NewTicker(waitDelay) - defer tick.Stop() - - timer := time.NewTimer(timeout) - defer timer.Stop() - - for { - select { - case <-tick.C: - l := s.raft.Leader() - if l != "" { - return l, nil - } - case <-timer.C: + // leave the gossip pool + if b.serf != nil { + if err := b.serf.Leave(); err != nil { + b.logger.Info("failed to leave serf cluster: %v", err) } } -} -func (s *Broker) WaitForAppliedIndex(idx uint64, timeout time.Duration) error { - tick := time.NewTicker(waitDelay) - defer tick.Stop() - timer := time.NewTimer(timeout) - defer timer.Stop() - - for { - select { - case <-tick.C: - if s.raft.AppliedIndex() >= idx { - return nil - } - case <-timer.C: - } - } + return nil } diff --git a/broker/broker_test.go b/broker/broker_test.go deleted file mode 100644 index 16e4de79..00000000 --- a/broker/broker_test.go +++ /dev/null @@ -1,122 +0,0 @@ -package broker - -import ( - "io/ioutil" - "os" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/travisjeffery/jocko/jocko" - "github.com/travisjeffery/simplelog" -) - -func TestStoreOpen(t *testing.T) { - dataDir, _ := ioutil.TempDir("", "broker_test") - defer os.RemoveAll(dataDir) - - raft0 := "127.0.0.1:5000" - raft1 := "127.0.0.1:5001" - raft2 := "127.0.0.1:5002" - - b0 := &jocko.BrokerConn{ - Host: "127.0.0.1", - Port: "3001", - RaftAddr: raft0, - ID: 0, - } - b1 := &jocko.BrokerConn{ - Host: "127.0.0.1", - Port: "5001", - RaftAddr: raft1, - ID: 1, - } - b2 := &jocko.BrokerConn{ - Host: "127.0.0.1", - Port: "5002", - RaftAddr: raft2, - ID: 2, - } - - logger := simplelog.New(os.Stdout, simplelog.INFO, "jocko/broker_test") - s0, err := New( - 0, - OptionDataDir(filepath.Join(dataDir, "0")), - OptionLogDir(filepath.Join(dataDir, "0")), - OptionRaftAddr(raft0), - OptionTCPAddr("127.0.0.1:3001"), - OptionBrokers([]*jocko.BrokerConn{b1, b2}), - OptionLogger(logger), - ) - assert.NoError(t, err) - assert.NotNil(t, s0) - - defer s0.Shutdown() - - s1, err := New( - 1, - OptionDataDir(filepath.Join(dataDir, "1")), - OptionLogDir(filepath.Join(dataDir, "1")), - OptionRaftAddr(raft1), - OptionTCPAddr(raft1), - OptionBrokers([]*jocko.BrokerConn{b0, b2}), - OptionLogger(logger), - ) - assert.NoError(t, err) - - defer s1.Shutdown() - - s2, err := New( - 2, - OptionDataDir(filepath.Join(dataDir, "2")), - OptionLogDir(filepath.Join(dataDir, "2")), - OptionRaftAddr(raft2), - OptionTCPAddr(raft2), - OptionBrokers([]*jocko.BrokerConn{b0, b1}), - OptionLogger(logger), - ) - assert.NoError(t, err) - - defer s2.Shutdown() - - l, err := s0.WaitForLeader(10 * time.Second) - assert.NoError(t, err) - - tp := &jocko.Partition{ - Topic: "test", - ID: 0, - Leader: b0, - PreferredLeader: b0, - } - - var peer, leader *Broker - bs := []*Broker{s0, s1, s2} - for _, b := range bs { - if b.raftAddr == l { - leader = b - } else { - peer = b - } - } - - err = leader.AddPartition(tp) - assert.NoError(t, err) - - err = s0.WaitForAppliedIndex(2, 10*time.Second) - assert.NoError(t, err) - - isLeader := s0.IsLeaderOfPartition(tp.Topic, tp.ID, tp.LeaderID()) - assert.True(t, isLeader) - - err = peer.WaitForAppliedIndex(2, 10*time.Second) - assert.NoError(t, err) - - // check that consensus was made to peer - ps, err := peer.TopicPartitions(tp.Topic) - assert.NoError(t, err) - for _, p := range ps { - assert.Equal(t, tp.Topic, p.Topic) - assert.Equal(t, tp.LeaderID(), p.LeaderID()) - } -} diff --git a/broker/fsm.go b/broker/fsm.go new file mode 100644 index 00000000..780e0c9e --- /dev/null +++ b/broker/fsm.go @@ -0,0 +1,24 @@ +package broker + +import ( + "io" + + "github.com/hashicorp/raft" +) + +func (s *Broker) Restore(rc io.ReadCloser) error { + return nil +} + +type FSMSnapshot struct { +} + +func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) error { + return nil +} + +func (f *FSMSnapshot) Release() {} + +func (s *Broker) Snapshot() (raft.FSMSnapshot, error) { + return &FSMSnapshot{}, nil +} diff --git a/broker/leader.go b/broker/leader.go index 53d996d2..1a73783b 100644 --- a/broker/leader.go +++ b/broker/leader.go @@ -2,6 +2,7 @@ package broker import ( "fmt" + "net" "time" "github.com/hashicorp/raft" @@ -44,7 +45,7 @@ func (b *Broker) leaderLoop(stopCh chan struct{}) { RECONCILE: reconcileCh = nil - interval := time.After(b.reconcileInterval) + interval := time.After(b.serfReconcileInterval) // start := time.Now() barrier := b.raft.Barrier(0) @@ -66,7 +67,7 @@ RECONCILE: goto WAIT } - reconcileCh = b.reconcileCh + reconcileCh = b.serfReconcileCh WAIT: for { @@ -105,7 +106,6 @@ func (b *Broker) reconcileMember(member serf.Member) error { if member.Name == fmt.Sprintf("%d", b.id) { return nil } - var err error switch member.Status { case serf.StatusAlive: @@ -121,7 +121,12 @@ func (b *Broker) reconcileMember(member serf.Member) error { } func (b *Broker) addRaftPeer(member serf.Member) error { - future := b.raft.AddPeer(member.Addr.String()) + broker, err := brokerConn(member) + if err != nil { + return err + } + addr := &net.TCPAddr{IP: net.ParseIP(broker.IP), Port: broker.RaftPort} + future := b.raft.AddPeer(addr.String()) if err := future.Error(); err != nil && err != raft.ErrKnownPeer { b.logger.Info("failed to add raft peer: %v", err) return err diff --git a/broker/option.go b/broker/option.go index 333dc542..1f795526 100644 --- a/broker/option.go +++ b/broker/option.go @@ -1,6 +1,7 @@ package broker import ( + "github.com/hashicorp/raft" "github.com/travisjeffery/jocko/jocko" "github.com/travisjeffery/simplelog" ) @@ -29,24 +30,44 @@ func (o optionLogDir) modifyBroker(b *Broker) { b.logDir = string(o) } -func OptionRaftAddr(raftAddr string) Option { - return optionRaftAddr(raftAddr) +type optionPort int + +func OptionPort(port int) Option { + return optionPort(port) +} + +func (o optionPort) modifyBroker(b *Broker) { + b.port = int(o) +} + +type optionRaftPort int + +func OptionRaftPort(raftPort int) Option { + return optionRaftPort(raftPort) +} + +func (o optionRaftPort) modifyBroker(b *Broker) { + b.raftPort = int(o) } -type optionRaftAddr string +type optionSerfPort int -func (o optionRaftAddr) modifyBroker(b *Broker) { - b.raftAddr = string(o) +func OptionSerfPort(serfPort int) Option { + return optionSerfPort(serfPort) } -func OptionTCPAddr(tcpAddr string) Option { - return optionTCPAddr(tcpAddr) +func (o optionSerfPort) modifyBroker(b *Broker) { + b.serfPort = int(o) } -type optionTCPAddr string +type optionBindAddr string -func (o optionTCPAddr) modifyBroker(b *Broker) { - b.tcpAddr = string(o) +func OptionBindAddr(bindAddr string) Option { + return optionBindAddr(bindAddr) +} + +func (o optionBindAddr) modifyBroker(b *Broker) { + b.bindAddr = string(o) } func OptionBrokers(brokers []*jocko.BrokerConn) Option { @@ -58,7 +79,11 @@ type optionBrokers struct { } func (o optionBrokers) modifyBroker(b *Broker) { - b.brokers = o.brokers + b.peerLock.Lock() + for _, peer := range o.brokers { + b.peers[peer.ID] = peer + } + b.peerLock.Unlock() } func OptionLogger(logger *simplelog.Logger) Option { @@ -73,6 +98,18 @@ func (o optionLogger) modifyBroker(b *Broker) { b.logger = o.logger } +type optionRaft struct { + raftConfig *raft.Config +} + +func (o optionRaft) modifyBroker(b *Broker) { + b.raftConfig = o.raftConfig +} + +func OptionRaft(conf *raft.Config) Option { + return optionRaft{conf} +} + type ReplicatorOption interface { modifyReplicator(*PartitionReplicator) } diff --git a/broker/replication_manager.go b/broker/replication_manager.go index d2be0337..11fb361e 100644 --- a/broker/replication_manager.go +++ b/broker/replication_manager.go @@ -28,7 +28,7 @@ func (rm *replicationManager) BecomeFollower(topic string, pid int32, leader int } } delete(rm.replicators, p) - p.Leader = rm.BrokerConn(leader) + p.Leader = leader hw := p.HighWatermark() if err := p.TruncateTo(hw); err != nil { return err @@ -49,12 +49,8 @@ func (rm *replicationManager) BecomeLeader(topic string, pid int32, command *pro return err } } - var conns []*jocko.BrokerConn - for _, isr := range command.ISR { - conns = append(conns, rm.BrokerConn(isr)) - } - p.Leader = rm.BrokerConn(rm.ID()) - p.ISR = conns + p.Leader = rm.ID() + p.ISR = command.ISR p.LeaderandISRVersionInZK = command.ZKVersion return nil } diff --git a/broker/replicator_test.go b/broker/replicator_test.go index eb19bde4..a0c90655 100644 --- a/broker/replicator_test.go +++ b/broker/replicator_test.go @@ -3,8 +3,8 @@ package broker import ( "bytes" "io/ioutil" + "net" "os" - "path/filepath" "testing" "time" @@ -12,112 +12,35 @@ import ( "github.com/travisjeffery/jocko/jocko" "github.com/travisjeffery/jocko/protocol" "github.com/travisjeffery/jocko/server" - "github.com/travisjeffery/simplelog" ) -func TestFetchMessages(t *testing.T) { - dataDir, _ := ioutil.TempDir("", "storetest") +func TestBroker_Replicate(t *testing.T) { + dataDir, _ := ioutil.TempDir("", "replicate_test") defer os.RemoveAll(dataDir) - raft0 := "127.0.0.1:4000" - raft1 := "127.0.0.1:4001" - raft2 := "127.0.0.1:4002" - - b0 := &jocko.BrokerConn{ - Host: "127.0.0.1", - Port: "3000", - RaftAddr: raft0, - ID: 0, - } - b1 := &jocko.BrokerConn{ - Host: "127.0.0.1", - Port: "4001", - RaftAddr: raft1, - ID: 1, - } - b2 := &jocko.BrokerConn{ - Host: "127.0.0.1", - Port: "4002", - RaftAddr: raft2, - ID: 2, - } - - logger := simplelog.New(os.Stdout, simplelog.INFO, "jocko/replicator_test") - s0, err := New(0, - OptionDataDir(filepath.Join(dataDir, "0")), - OptionLogDir(filepath.Join(dataDir, "0")), - OptionRaftAddr(raft0), - OptionTCPAddr("127.0.0.1:3000"), - OptionBrokers([]*jocko.BrokerConn{b1, b2}), - OptionLogger(logger), - ) - assert.NoError(t, err) - assert.NotNil(t, s0) - + s0 := testServer(t, 0) defer s0.Shutdown() - s1, err := New(1, - OptionDataDir(filepath.Join(dataDir, "1")), - OptionLogDir(filepath.Join(dataDir, "1")), - OptionRaftAddr(raft1), - OptionTCPAddr(raft1), - OptionBrokers([]*jocko.BrokerConn{b0, b2}), - OptionLogger(logger), - ) - assert.NoError(t, err) - - defer s1.Shutdown() - - s2, err := New(2, - OptionDataDir(filepath.Join(dataDir, "2")), - OptionLogDir(filepath.Join(dataDir, "2")), - OptionRaftAddr(raft2), - OptionTCPAddr(raft2), - OptionBrokers([]*jocko.BrokerConn{b0, b1}), - OptionLogger(logger), - ) - assert.NoError(t, err) - - defer s2.Shutdown() - - l, err := s0.WaitForLeader(10 * time.Second) + addr := &net.TCPAddr{IP: net.ParseIP(s0.bindAddr), Port: s0.port} + srv := server.New(addr.String(), s0, logger) + err := srv.Start() assert.NoError(t, err) tp := &jocko.Partition{ Topic: "test", ID: 0, - Leader: b0, - PreferredLeader: b0, + Leader: 0, + PreferredLeader: 0, + Replicas: []int32{0}, } - var peer, leader *Broker - bs := []*Broker{s0, s1, s2} - for _, b := range bs { - if b.raftAddr == l { - leader = b - } else { - peer = b - } - } - - err = leader.AddPartition(tp) - assert.NoError(t, err) - - err = s0.WaitForAppliedIndex(2, 10*time.Second) - assert.NoError(t, err) - - isLeader := s0.IsLeaderOfPartition(tp.Topic, tp.ID, tp.LeaderID()) - assert.True(t, isLeader) - - err = peer.WaitForAppliedIndex(2, 10*time.Second) + err = s0.AddPartition(tp) assert.NoError(t, err) - server := server.New(s0.tcpAddr, s0, logger) - err = server.Start() + p, err := s0.Partition("test", 0) assert.NoError(t, err) - defer server.Close() - replicator := NewPartitionReplicator(tp, 0, + replicator := NewPartitionReplicator(p, 0, ReplicatorOptionMinBytes(5), ReplicatorOptionMaxWaitTime(int32(time.Millisecond*250))) assert.NoError(t, err) @@ -135,9 +58,6 @@ func TestFetchMessages(t *testing.T) { Messages: msgs, }} - p, err := s0.Partition("test", 0) - assert.NoError(t, err) - for _, ms := range mss { encMs, err := protocol.Encode(ms) assert.NoError(t, err) diff --git a/broker/serf.go b/broker/serf.go index e8929685..4485ce8c 100644 --- a/broker/serf.go +++ b/broker/serf.go @@ -40,7 +40,7 @@ func (b *Broker) nodeJoin(me serf.MemberEvent) { b.logger.Info("failed to parse peer from serf member: %s", m.Name) continue } - b.logger.Info("adding peer: %s", b) + b.logger.Info("adding peer: %s", peer) b.peerLock.Lock() b.peers[peer.ID] = peer b.peerLock.Unlock() @@ -58,7 +58,7 @@ func (b *Broker) localMemberEvent(me serf.MemberEvent) { m.Status = StatusReap } select { - case b.reconcileCh <- m: + case b.serfReconcileCh <- m: default: } } @@ -77,3 +77,7 @@ func (b *Broker) nodeFailed(me serf.MemberEvent) { b.peerLock.Unlock() } } + +func (b *Broker) members() []serf.Member { + return b.serf.Members() +} diff --git a/broker/serf_test.go b/broker/serf_test.go new file mode 100644 index 00000000..fb855c54 --- /dev/null +++ b/broker/serf_test.go @@ -0,0 +1,169 @@ +package broker + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/hashicorp/nomad/testutil" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/assert" + "github.com/travisjeffery/simplelog" +) + +var ( + logger *simplelog.Logger + serfPort int + raftPort int + port int + dataDir string +) + +func init() { + logger = simplelog.New(os.Stdout, simplelog.INFO, "jocko/broker_test") + serfPort = 7946 + raftPort = 5000 + port = 8000 + dataDir, _ = ioutil.TempDir("", "broker_test") +} + +func TestBroker_JoinPeer(t *testing.T) { + defer os.RemoveAll(dataDir) + + s0 := testServer(t, 0) + defer s0.Shutdown() + s1 := testServer(t, 1) + defer s1.Shutdown() + + testJoin(t, s0, s1) + + testutil.WaitForResult(func() (bool, error) { + if len(s1.members()) != 2 { + return false, fmt.Errorf("bad: %#v", s1.members()) + } + if len(s0.members()) != 2 { + return false, fmt.Errorf("bad: %#v", s0.members()) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + testutil.WaitForResult(func() (bool, error) { + if len(s1.peers) != 2 { + return false, fmt.Errorf("bad: %#v", s1.peers) + } + if len(s0.peers) != 2 { + return false, fmt.Errorf("bad: %#v", s0.peers) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + +func TestBroker_RemovePeer(t *testing.T) { + defer os.RemoveAll(dataDir) + + s0 := testServer(t, 0) + defer s0.Shutdown() + + s1 := testServer(t, 1) + defer s1.Shutdown() + + testJoin(t, s0, s1) + + testutil.WaitForResult(func() (bool, error) { + if len(s1.members()) != 2 { + return false, fmt.Errorf("bad: %#v", s1.members()) + } + if len(s0.members()) != 2 { + return false, fmt.Errorf("bad: %#v", s0.members()) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + testutil.WaitForResult(func() (bool, error) { + if len(s1.peers) != 2 { + return false, fmt.Errorf("bad: %#v", s1.peers) + } + if len(s0.peers) != 2 { + return false, fmt.Errorf("bad: %#v", s0.peers) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + s1.Leave() + s1.Shutdown() + + testutil.WaitForResult(func() (bool, error) { + if len(s1.peers) != 1 { + return false, fmt.Errorf("bad: %#v", s1.peers) + } + if len(s0.peers) != 1 { + return false, fmt.Errorf("bad: %#v", s0.peers) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + +func testServer(t *testing.T, id int, opts ...Option) *Broker { + idStr := strconv.Itoa(id) + + raftConf := raft.DefaultConfig() + raftConf.LeaderLeaseTimeout = 50 * time.Millisecond + raftConf.HeartbeatTimeout = 50 * time.Millisecond + raftConf.ElectionTimeout = 50 * time.Millisecond + + opts = append(opts, []Option{ + OptionDataDir(filepath.Join(dataDir, idStr)), + OptionLogDir(filepath.Join(dataDir, idStr)), + OptionBindAddr("127.0.0.1"), + OptionPort(getPort()), + OptionSerfPort(getSerfPort()), + OptionRaftPort(getRaftPort()), + OptionLogger(logger), + OptionRaft(raftConf), + }...) + + broker, err := New( + int32(id), + opts..., + ) + assert.NoError(t, err) + return broker +} + +func getRaftPort() int { + raftPort++ + return raftPort +} + +func getSerfPort() int { + serfPort++ + return serfPort +} + +func getPort() int { + port++ + return port +} + +func testJoin(t *testing.T, s0 *Broker, other ...*Broker) { + addr := fmt.Sprintf("127.0.0.1:%d", s0.serfPort) + for _, s1 := range other { + num, err := s1.Join(addr) + assert.NoError(t, err) + assert.Equal(t, 1, num) + } +} diff --git a/broker/util.go b/broker/util.go index d57ba651..984d7890 100644 --- a/broker/util.go +++ b/broker/util.go @@ -1,13 +1,49 @@ package broker import ( - "net" "strconv" + "time" "github.com/hashicorp/serf/serf" "github.com/travisjeffery/jocko/jocko" ) +func (s *Broker) WaitForLeader(timeout time.Duration) (string, error) { + tick := time.NewTicker(waitDelay) + defer tick.Stop() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-tick.C: + l := s.raft.Leader() + if l != "" { + return l, nil + } + case <-timer.C: + } + } +} + +func (s *Broker) WaitForAppliedIndex(idx uint64, timeout time.Duration) error { + tick := time.NewTicker(waitDelay) + defer tick.Stop() + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-tick.C: + if s.raft.AppliedIndex() >= idx { + return nil + } + case <-timer.C: + } + } +} + func brokerConn(m serf.Member) (*jocko.BrokerConn, error) { portStr := m.Tags["port"] port, err := strconv.Atoi(portStr) @@ -21,11 +57,17 @@ func brokerConn(m serf.Member) (*jocko.BrokerConn, error) { return nil, err } - addr := &net.TCPAddr{IP: m.Addr, Port: port} + raftPortStr := m.Tags["raft_port"] + raftPort, err := strconv.Atoi(raftPortStr) + if err != nil { + return nil, err + } + conn := &jocko.BrokerConn{ - Addr: addr, - ID: int32(id), - Port: portStr, + IP: m.Addr.String(), + ID: int32(id), + RaftPort: raftPort, + Port: port, } return conn, nil diff --git a/cmd/jocko/main.go b/cmd/jocko/main.go index ba87c988..a538a994 100644 --- a/cmd/jocko/main.go +++ b/cmd/jocko/main.go @@ -33,8 +33,6 @@ func main() { store, err := broker.New(*brokerID, broker.OptionDataDir(*logDir), broker.OptionLogDir(*logDir), - broker.OptionRaftAddr(*raftAddr), - broker.OptionTCPAddr(*tcpAddr), broker.OptionLogger(logger)) if err != nil { fmt.Fprintf(os.Stderr, "Error with new broker: %s\n", err) diff --git a/example/main.go b/example/main.go index 34e422d2..e9c910b0 100644 --- a/example/main.go +++ b/example/main.go @@ -117,8 +117,6 @@ func setup() { store, err := broker.New(*brokerID, broker.OptionDataDir(*logDir), broker.OptionLogDir(*logDir), - broker.OptionRaftAddr(*raftAddr), - broker.OptionTCPAddr(*tcpAddr), broker.OptionLogger(logger)) if err != nil { fmt.Fprintf(os.Stderr, "Error opening raft store: %s\n", err) diff --git a/jocko/jocko.go b/jocko/jocko.go index 1ff9ab78..ac80464f 100644 --- a/jocko/jocko.go +++ b/jocko/jocko.go @@ -20,17 +20,17 @@ type CommitLog interface { } type Partition struct { - Topic string `json:"topic"` - ID int32 `json:"id"` - - // Broker ids - Replicas []*BrokerConn `json:"replicas"` - ISR []*BrokerConn `json:"isr"` - Leader *BrokerConn `json:"leader"` - PreferredLeader *BrokerConn `json:"preferred_leader"` + Topic string `json:"topic"` + ID int32 `json:"id"` + Replicas []int32 `json:"replicas"` + ISR []int32 `json:"isr"` + Leader int32 `json:"leader"` + PreferredLeader int32 `json:"preferred_leader"` LeaderandISRVersionInZK int32 `json:"-"` CommitLog CommitLog `json:"-"` + + Conn io.ReadWriter `json:"-"` } func NewPartition(topic string, id int32) *Partition { @@ -58,12 +58,12 @@ func (r *Partition) IsOpen() bool { } func (r *Partition) IsLeader(id int32) bool { - return int32(r.Leader.ID) == id + return r.Leader == id } func (r *Partition) IsFollowing(id int32) bool { for _, b := range r.Replicas { - if int32(b.ID) == id { + if b == id { return true } } @@ -83,11 +83,11 @@ func (p *Partition) TruncateTo(offset int64) error { } func (p *Partition) Write(b []byte) (int, error) { - return p.Leader.Write(b) + return p.Conn.Write(b) } func (p *Partition) Read(b []byte) (int, error) { - return p.Leader.Read(b) + return p.Conn.Read(b) } func (p *Partition) Append(ms []byte) (int64, error) { @@ -95,7 +95,7 @@ func (p *Partition) Append(ms []byte) (int64, error) { } func (p *Partition) LeaderID() int32 { - return p.Leader.ID + return p.Leader } // func (p *Partition) StartReplica(brokerID int32) (err error) { @@ -108,7 +108,7 @@ func (p *Partition) LeaderID() int32 { type Broker interface { ID() int32 - Port() string + Port() int Host() string IsController() bool CreateTopic(topic string, partitions int32) error @@ -117,7 +117,7 @@ type Broker interface { BrokerConn(brokerID int32) *BrokerConn BecomeLeader(topic string, id int32, command *protocol.PartitionState) error BecomeFollower(topic string, id int32, leaderID int32) error - Join(brokerID int32, addr ...string) (int, error) + Join(addr ...string) (int, error) Cluster() []*BrokerConn TopicPartitions(topic string) ([]*Partition, error) IsLeaderOfPartition(topic string, id int32, leaderID int32) bool @@ -125,14 +125,19 @@ type Broker interface { type BrokerConn struct { ID int32 `json:"id"` - Host string `json:"host"` - Port string `json:"port"` + Port int `json:"port"` + IP string `json:"addr"` - Addr *net.TCPAddr `json:"-"` + SerfPort int `json:"-"` + RaftPort int `json:"-"` conn net.Conn } +func (b *BrokerConn) Addr() *net.TCPAddr { + return &net.TCPAddr{IP: net.ParseIP(b.IP), Port: b.Port} +} + func (b *BrokerConn) Write(p []byte) (int, error) { if b.conn == nil { if err := b.connect(); err != nil { @@ -152,7 +157,8 @@ func (b *BrokerConn) Read(p []byte) (int, error) { } func (b *BrokerConn) connect() error { - conn, err := net.DialTCP("tcp", nil, b.Addr) + addr := &net.TCPAddr{IP: net.ParseIP(b.IP), Port: b.Port} + conn, err := net.DialTCP("tcp", nil, addr) if err != nil { return err } diff --git a/server/server.go b/server/server.go index f55885d8..a38b66c4 100644 --- a/server/server.go +++ b/server/server.go @@ -7,7 +7,6 @@ import ( "net" "net/http" "os" - "strconv" "sync" "time" @@ -306,7 +305,7 @@ func (s *Server) handleJoin(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() // TODO: change join to take a broker - if _, err := s.broker.Join(b.ID, b.Host); err != nil { + if _, err := s.broker.Join(b.IP); err != nil { w.WriteHeader(http.StatusInternalServerError) return } @@ -316,14 +315,10 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r brokers := make([]*protocol.Broker, len(s.broker.Cluster())) topics := make([]*protocol.TopicMetadata, len(req.Topics)) for i, b := range s.broker.Cluster() { - port, err := strconv.Atoi(b.Port) - if err != nil { - return err - } brokers[i] = &protocol.Broker{ NodeID: b.ID, - Host: b.Host, - Port: int32(port), + Host: b.IP, + Port: int32(b.Port), } } for i, t := range req.Topics { diff --git a/server/server_test.go b/server/server_test.go index d1bf51cd..f36ca008 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -3,10 +3,10 @@ package server import ( "bytes" "io" + "io/ioutil" "math/rand" "net" "os" - "path/filepath" "testing" "time" @@ -17,26 +17,31 @@ import ( "github.com/travisjeffery/simplelog" ) +var ( + dataDir string +) + +func init() { + dataDir, _ = ioutil.TempDir("", "server_test") +} + const ( clientID = "test_client" ) -func TestNewServer(t *testing.T) { +func TestBroker_Server(t *testing.T) { dir := os.TempDir() defer os.RemoveAll(dir) - logs := filepath.Join(dir, "logs") - assert.NoError(t, os.MkdirAll(logs, 0755)) - - data := filepath.Join(dir, "data") - assert.NoError(t, os.MkdirAll(data, 0755)) + assert.NoError(t, os.MkdirAll(dataDir, 0755)) logger := simplelog.New(os.Stdout, simplelog.DEBUG, "jocko/servertest") store, err := broker.New(0, - broker.OptionDataDir(data), - broker.OptionLogDir(logs), - broker.OptionRaftAddr("localhost:6000"), - broker.OptionTCPAddr("localhost:8000"), + broker.OptionDataDir(dataDir), + broker.OptionLogDir(dataDir), + broker.OptionBindAddr("127.0.0.1"), + broker.OptionPort(8000), + broker.OptionRaftPort(8001), broker.OptionLogger(logger)) assert.NoError(t, err) defer store.Shutdown() diff --git a/testutil/wait.go b/testutil/wait.go new file mode 100644 index 00000000..43bfb13b --- /dev/null +++ b/testutil/wait.go @@ -0,0 +1,32 @@ +package testutil + +import "time" + +type testFn func() (bool, error) +type errorFn func(error) + +func WaitForResult(test testFn, error errorFn) { + WaitForResultRetries(2000*TestMultiplier(), test, error) +} + +func WaitForResultRetries(retries int64, test testFn, error errorFn) { + for retries > 0 { + time.Sleep(10 * time.Millisecond) + retries-- + + success, err := test() + if success { + return + } + + if retries == 0 { + error(err) + } + } +} + +// TestMultiplier returns a multiplier for retries and waits given environment +// the tests are being run under. +func TestMultiplier() int64 { + return 1 +}