Skip to content

Commit

Permalink
add rest of serf implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 29, 2016
1 parent e21072b commit 70d6475
Show file tree
Hide file tree
Showing 16 changed files with 493 additions and 394 deletions.
206 changes: 98 additions & 108 deletions broker/broker.go
Expand Up @@ -3,11 +3,11 @@ package broker
import (
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"path/filepath"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -63,29 +63,31 @@ type Broker struct {

id int32
host string
port string
port int
topics map[string][]*jocko.Partition

peers map[int32]*jocko.BrokerConn
peerLock sync.Mutex

dataDir string
raftAddr string
tcpAddr string
logDir string
dataDir string
bindAddr string
logDir string
devDisableBootstrap bool

raft *raft.Raft
raftPort int
raftPeers raft.PeerStore
raftTransport *raft.NetworkTransport
raftStore *raftboltdb.BoltStore
raftLeaderCh chan bool
raftConfig *raft.Config

serf *serf.Serf
serfReconcileCh chan serf.Member
serfEventCh chan serf.Event

reconcileInterval time.Duration
reconcileCh chan serf.Member
serf *serf.Serf
serfPort int
serfAddr string
serfReconcileCh chan serf.Member
serfReconcileInterval time.Duration
serfEventCh chan serf.Event

left bool
shutdownCh chan struct{}
Expand All @@ -101,28 +103,23 @@ const (
func New(id int32, opts ...Option) (*Broker, error) {
var err error
b := &Broker{
replicationManager: newReplicationManager(),
peers: make(map[int32]*jocko.BrokerConn),
id: id,
topics: make(map[string][]*jocko.Partition),
serfReconcileCh: make(chan serf.Member, 32),
serfEventCh: make(chan serf.Event, 256),
reconcileInterval: time.Second * 60,
shutdownCh: make(chan struct{}),
serfPort: 7946,
serfAddr: "0.0.0.0",
raftConfig: raft.DefaultConfig(),
replicationManager: newReplicationManager(),
peers: make(map[int32]*jocko.BrokerConn),
id: id,
topics: make(map[string][]*jocko.Partition),
serfReconcileCh: make(chan serf.Member, 32),
serfEventCh: make(chan serf.Event, 256),
serfReconcileInterval: time.Second * 5,
shutdownCh: make(chan struct{}),
}

for _, o := range opts {
o.modifyBroker(b)
}

host, port, err := net.SplitHostPort(b.tcpAddr)
if err != nil {
return nil, err
}

b.host = host
b.port = port

serfConfig := serf.DefaultConfig()
b.serf, err = b.setupSerf(serfConfig, b.serfEventCh, serfSnapshot)
if err != nil {
Expand All @@ -135,19 +132,20 @@ func New(id int32, opts ...Option) (*Broker, error) {
return nil, err
}

// monitor leadership changes
go b.monitorLeadership()

// ingest events for serf
go b.serfEventHandler()

return b, nil
}

func (b *Broker) setupRaft() error {
conf := raft.DefaultConfig()

addr, err := net.ResolveTCPAddr("tcp", b.raftAddr)
if err != nil {
return errors.Wrap(err, "resolve bind addr failed")
}
func (b *Broker) setupRaft() (err error) {
addr := &net.TCPAddr{IP: net.ParseIP(b.bindAddr), Port: b.raftPort}

if b.raftTransport == nil {
b.raftTransport, err = raft.NewTCPTransport(b.raftAddr, addr, 3, timeout, os.Stderr)
b.raftTransport, err = raft.NewTCPTransport(addr.String(), nil, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failed")
}
Expand All @@ -160,6 +158,15 @@ func (b *Broker) setupRaft() error {

b.raftPeers = raft.NewJSONPeers(path, b.raftTransport)

var peers []string
for _, p := range b.peers {
addr := &net.TCPAddr{IP: net.ParseIP(p.IP), Port: p.RaftPort}
peers = append(peers, addr.String())
}
if err = b.raftPeers.SetPeers(peers); err != nil {
return err
}

snapshots, err := raft.NewFileSnapshotStore(path, 2, os.Stderr)
if err != nil {
return err
Expand All @@ -173,9 +180,10 @@ func (b *Broker) setupRaft() error {

leaderCh := make(chan bool, 1)
b.raftLeaderCh = leaderCh
conf.NotifyCh = leaderCh
b.raftConfig.NotifyCh = leaderCh
b.raftConfig.StartAsLeader = !b.devDisableBootstrap

raft, err := raft.NewRaft(conf, b, boltStore, boltStore, snapshots, b.raftPeers, b.raftTransport)
raft, err := raft.NewRaft(b.raftConfig, b, boltStore, boltStore, snapshots, b.raftPeers, b.raftTransport)
if err != nil {
if b.raftStore != nil {
b.raftStore.Close()
Expand All @@ -185,21 +193,33 @@ func (b *Broker) setupRaft() error {
}
b.raft = raft

// monitor leadership changes
go b.monitorLeadership()

return nil
}

func (b *Broker) setupSerf(conf *serf.Config, eventCh chan serf.Event, serfSnapshot string) (*serf.Serf, error) {
conf.Init()
id := fmt.Sprintf("%d", b.id)
id := fmt.Sprintf("jocko-%03d", b.id)
conf.MemberlistConfig.BindAddr = b.serfAddr
conf.MemberlistConfig.BindPort = b.serfPort
conf.NodeName = id
conf.Tags["id"] = id
conf.Tags["port"] = b.port
conf.Tags["id"] = strconv.Itoa(int(b.id))
conf.Tags["port"] = strconv.Itoa(b.port)
conf.Tags["raft_port"] = strconv.Itoa(b.raftPort)
conf.EventCh = eventCh
conf.EnableNameConflictResolution = false
return serf.Create(conf)
s, err := serf.Create(conf)
if err != nil {
return nil, err
}
if len(b.peers) > 0 {
var addrs []string
for _, p := range b.peers {
addr := &net.TCPAddr{IP: net.ParseIP(p.IP), Port: p.SerfPort}
addrs = append(addrs, addr.String())
}
s.Join(addrs, true)
}
return s, nil
}

func (b *Broker) ID() int32 {
Expand All @@ -210,7 +230,7 @@ func (b *Broker) Host() string {
return b.host
}

func (b *Broker) Port() string {
func (b *Broker) Port() int {
return b.port
}

Expand Down Expand Up @@ -320,14 +340,21 @@ func (s *Broker) BrokerConn(id int32) *jocko.BrokerConn {
}

func (s *Broker) addPartition(partition *jocko.Partition) {
s.mu.RLock()
s.mu.Lock()
if v, ok := s.topics[partition.Topic]; ok {
s.topics[partition.Topic] = append(v, partition)
} else {
s.topics[partition.Topic] = []*jocko.Partition{partition}
}
s.mu.RUnlock()
if s.IsLeaderOfPartition(partition.Topic, partition.ID, partition.LeaderID()) {
s.mu.Unlock()
isLeader := partition.Leader == s.id
isFollower := false
for _, r := range partition.Replicas {
if r == s.id {
isFollower = true
}
}
if isLeader || isFollower {
commitLog, err := commitlog.New(commitlog.Options{
Path: path.Join(s.logDir, partition.String()),
MaxSegmentBytes: 1024,
Expand All @@ -343,28 +370,29 @@ func (s *Broker) addPartition(partition *jocko.Partition) {
panic(err)
}
partition.CommitLog = commitLog

partition.Conn = s.peers[partition.LeaderID()]
}
}

func (s *Broker) addBroker(broker *jocko.BrokerConn) {
s.mu.Lock()
defer s.mu.Unlock()
// TODO: remove this
s.peers = append(s.peers, broker)
s.peerLock.Lock()
s.peers[broker.ID] = broker
s.peerLock.Unlock()
}

func (s *Broker) IsLeaderOfPartition(topic string, pid int32, lid int32) bool {
s.mu.RLock()
defer s.mu.RUnlock()
result := false
for _, p := range s.topics[topic] {
if p.ID == pid {
if lid == s.id {
return true
}
result = lid == p.LeaderID()
break
}
}
return false
return result
}

func (s *Broker) Topics() []string {
Expand All @@ -377,7 +405,7 @@ func (s *Broker) Topics() []string {

// Join is used to have the broker join the gossip ring
// The target address should be another broker listening on the Serf address
func (s *Broker) Join(id int32, addrs ...string) (int, error) {
func (s *Broker) Join(addrs ...string) (int, error) {
return s.serf.Join(addrs, true)
}

Expand Down Expand Up @@ -431,15 +459,13 @@ func (s *Broker) CreateTopic(topic string, partitions int32) error {
return ErrTopicExists
}
}
brokers := s.peers
for i := int32(0); i < partitions; i++ {
broker := brokers[i%int32(len(brokers))]
partition := &jocko.Partition{
Topic: topic,
ID: i,
Leader: broker,
PreferredLeader: broker,
Replicas: []*jocko.BrokerConn{broker},
Leader: i,
PreferredLeader: i,
Replicas: []int32{i},
}
if err := s.AddPartition(partition); err != nil {
return err
Expand Down Expand Up @@ -478,55 +504,19 @@ func (s *Broker) deleteTopic(tp *jocko.Partition) error {
return nil
}

func (s *Broker) Restore(rc io.ReadCloser) error {
return nil
}
// Leave is used to prepare for a graceful shutdown of the server
func (b *Broker) Leave() error {
b.logger.Info("broker starting to leave")
b.left = true

type FSMSnapshot struct {
}
// TODO: handle case if we're the controller/leader

func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) error {
return nil
}

func (f *FSMSnapshot) Release() {}

func (s *Broker) Snapshot() (raft.FSMSnapshot, error) {
return &FSMSnapshot{}, nil
}

func (s *Broker) WaitForLeader(timeout time.Duration) (string, error) {
tick := time.NewTicker(waitDelay)
defer tick.Stop()

timer := time.NewTimer(timeout)
defer timer.Stop()

for {
select {
case <-tick.C:
l := s.raft.Leader()
if l != "" {
return l, nil
}
case <-timer.C:
// leave the gossip pool
if b.serf != nil {
if err := b.serf.Leave(); err != nil {
b.logger.Info("failed to leave serf cluster: %v", err)
}
}
}

func (s *Broker) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
tick := time.NewTicker(waitDelay)
defer tick.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()

for {
select {
case <-tick.C:
if s.raft.AppliedIndex() >= idx {
return nil
}
case <-timer.C:
}
}
return nil
}

0 comments on commit 70d6475

Please sign in to comment.