Skip to content

Commit

Permalink
separate raft and serf some more
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 31, 2016
1 parent 38da9d1 commit 2f25f4f
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 210 deletions.
250 changes: 41 additions & 209 deletions broker/broker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
package broker

import (
"encoding/json"
"fmt"
"net"
"os"
"path"
"path/filepath"
"strconv"
"sync"
"time"

Expand All @@ -25,37 +19,10 @@ const (
waitDelay = 100 * time.Millisecond
)

type CmdType int

const (
addPartition CmdType = iota
addBroker
removeBroker
deleteTopic
)

var (
ErrTopicExists = errors.New("topic exists already")
)

type command struct {
Cmd CmdType `json:"type"`
Data *json.RawMessage `json:"data"`
}

func newCommand(cmd CmdType, data interface{}) (c command, err error) {
var b []byte
b, err = json.Marshal(data)
if err != nil {
return c, err
}
r := json.RawMessage(b)
return command{
Cmd: cmd,
Data: &r,
}, nil
}

type Broker struct {
*replicationManager
mu sync.RWMutex
Expand Down Expand Up @@ -141,91 +108,12 @@ func New(id int32, opts ...Option) (*Broker, error) {
return b, nil
}

func (b *Broker) setupRaft() (err error) {
addr := &net.TCPAddr{IP: net.ParseIP(b.bindAddr), Port: b.raftPort}

if b.raftTransport == nil {
b.raftTransport, err = raft.NewTCPTransport(addr.String(), nil, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failed")
}
}

path := filepath.Join(b.dataDir, raftState)
if err = os.MkdirAll(path, 0755); err != nil {
return errors.Wrap(err, "data directory mkdir failed")
}

b.raftPeers = raft.NewJSONPeers(path, b.raftTransport)

var peers []string
for _, p := range b.peers {
addr := &net.TCPAddr{IP: net.ParseIP(p.IP), Port: p.RaftPort}
peers = append(peers, addr.String())
}
if err = b.raftPeers.SetPeers(peers); err != nil {
return err
}

snapshots, err := raft.NewFileSnapshotStore(path, 2, os.Stderr)
if err != nil {
return err
}

boltStore, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return errors.Wrap(err, "bolt store failed")
}
b.raftStore = boltStore

leaderCh := make(chan bool, 1)
b.raftLeaderCh = leaderCh
b.raftConfig.NotifyCh = leaderCh
b.raftConfig.StartAsLeader = !b.devDisableBootstrap

raft, err := raft.NewRaft(b.raftConfig, b, boltStore, boltStore, snapshots, b.raftPeers, b.raftTransport)
if err != nil {
if b.raftStore != nil {
b.raftStore.Close()
}
b.raftTransport.Close()
return errors.Wrap(err, "raft failed")
}
b.raft = raft

return nil
}

func (b *Broker) setupSerf(conf *serf.Config, eventCh chan serf.Event, serfSnapshot string) (*serf.Serf, error) {
conf.Init()
id := fmt.Sprintf("jocko-%03d", b.id)
conf.MemberlistConfig.BindAddr = b.serfAddr
conf.MemberlistConfig.BindPort = b.serfPort
conf.NodeName = id
conf.Tags["id"] = strconv.Itoa(int(b.id))
conf.Tags["port"] = strconv.Itoa(b.port)
conf.Tags["raft_port"] = strconv.Itoa(b.raftPort)
conf.EventCh = eventCh
conf.EnableNameConflictResolution = false
s, err := serf.Create(conf)
if err != nil {
return nil, err
}
if len(b.peers) > 0 {
var addrs []string
for _, p := range b.peers {
addr := &net.TCPAddr{IP: net.ParseIP(p.IP), Port: p.SerfPort}
addrs = append(addrs, addr.String())
}
s.Join(addrs, true)
}
return s, nil
}

// ID is used to get the broker's ID
func (b *Broker) ID() int32 {
return b.id
}

// Host is used to get Broker's host
func (b *Broker) Host() string {
return b.host
}
Expand All @@ -242,45 +130,6 @@ func (b *Broker) Cluster() []*jocko.BrokerConn {
return cluster
}

func (b *Broker) Shutdown() error {
b.logger.Info("shutting down broker")
b.shutdownLock.Lock()
defer b.shutdownLock.Unlock()

if b.shutdown {
return nil
}

b.shutdown = true
close(b.shutdownCh)

if b.serf != nil {
b.serf.Shutdown()
}

if b.raft != nil {
b.raftTransport.Close()
future := b.raft.Shutdown()
if err := future.Error(); err != nil {
b.logger.Info("failed to shutdown raft: %s", err)
}
if b.raftStore != nil {
b.raftStore.Close()
}
}

return nil
}

func (b *Broker) IsShutdown() bool {
select {
case <-b.shutdownCh:
return true
default:
return false
}
}

// IsController checks if this broker is the cluster controller
func (s *Broker) IsController() bool {
return s.raft.State() == raft.Leader
Expand Down Expand Up @@ -317,19 +166,6 @@ func (s *Broker) AddBroker(broker jocko.BrokerConn) error {
return s.apply(addBroker, broker)
}

func (s *Broker) apply(cmdType CmdType, data interface{}) error {
c, err := newCommand(cmdType, data)
if err != nil {
return err
}
b, err := json.Marshal(c)
if err != nil {
return err
}
f := s.raft.Apply(b, timeout)
return f.Error()
}

func (s *Broker) BrokerConn(id int32) *jocko.BrokerConn {
for _, b := range s.Cluster() {
if b.ID == id {
Expand Down Expand Up @@ -409,49 +245,6 @@ func (s *Broker) Join(addrs ...string) (int, error) {
return s.serf.Join(addrs, true)
}

func (s *Broker) Apply(l *raft.Log) interface{} {
var c command
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.logger.Debug("broker/apply cmd [%d]", c.Cmd)
switch c.Cmd {
case addBroker:
broker := new(jocko.BrokerConn)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := json.Unmarshal(b, broker); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.addBroker(broker)
case addPartition:
p := new(jocko.Partition)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := json.Unmarshal(b, p); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.addPartition(p)
case deleteTopic:
p := new(jocko.Partition)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := json.Unmarshal(b, p); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := s.deleteTopic(p); err != nil {
panic(errors.Wrap(err, "topic delete failed"))
}
}
return nil
}

// CreateTopic creates topic with partitions count.
func (s *Broker) CreateTopic(topic string, partitions int32) error {
for _, t := range s.Topics() {
Expand Down Expand Up @@ -520,3 +313,42 @@ func (b *Broker) Leave() error {

return nil
}

func (b *Broker) Shutdown() error {
b.logger.Info("shutting down broker")
b.shutdownLock.Lock()
defer b.shutdownLock.Unlock()

if b.shutdown {
return nil
}

b.shutdown = true
close(b.shutdownCh)

if b.serf != nil {
b.serf.Shutdown()
}

if b.raft != nil {
b.raftTransport.Close()
future := b.raft.Shutdown()
if err := future.Error(); err != nil {
b.logger.Info("failed to shutdown raft: %s", err)
}
if b.raftStore != nil {
b.raftStore.Close()
}
}

return nil
}

func (b *Broker) IsShutdown() bool {
select {
case <-b.shutdownCh:
return true
default:
return false
}
}
Loading

0 comments on commit 2f25f4f

Please sign in to comment.