Skip to content

Commit

Permalink
Merge pull request #1857 from nats-io/rp
Browse files Browse the repository at this point in the history
Add in leader elected and lost quorum advisories.
  • Loading branch information
derekcollison committed Jan 28, 2021
2 parents 76fdaea + 67dc580 commit 7d21488
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 35 deletions.
7 changes: 7 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,6 +2316,13 @@ func (mset *Stream) DeleteConsumer(o *Consumer) error {
return o.Delete()
}

func (o *Consumer) Stream() string {
o.mu.RLock()
mset := o.mset
o.mu.RUnlock()
return mset.Name()
}

// Active indicates if this consumer is still active.
func (o *Consumer) Active() bool {
o.mu.Lock()
Expand Down
30 changes: 21 additions & 9 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,33 +162,45 @@ const (
// JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"

// JSAdvisoryStreamCreatedPre notification that a stream was created
// JSAdvisoryStreamCreatedPre notification that a stream was created.
JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"

// JSAdvisoryStreamDeletedPre notification that a stream was deleted
// JSAdvisoryStreamDeletedPre notification that a stream was deleted.
JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"

// JSAdvisoryStreamUpdatedPre notification that a stream was updated
// JSAdvisoryStreamUpdatedPre notification that a stream was updated.
JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"

// JSAdvisoryConsumerCreatedPre notification that a template created
// JSAdvisoryConsumerCreatedPre notification that a template created.
JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"

// JSAdvisoryConsumerDeletedPre notification that a template deleted
// JSAdvisoryConsumerDeletedPre notification that a template deleted.
JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"

// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created
// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"

// JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed
// JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"

// JSAdvisoryStreamRestoreCreatePre notification that a restore was start
// JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"

// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed
// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"

// JSAdvisoryStreamLeaderElectPre notification that a replicated stream has elected a leader.
JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"

// JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"

// JSAdvisoryConsumerLeaderElectPre notification that a replicated consumer has elected a leader.
JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"

// JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"

// JSAuditAdvisory is a notification about JetStream API access.
// FIXME - Add in details about who..
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
Expand Down
113 changes: 95 additions & 18 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,21 +1241,65 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
return didSnap, nil
}

// Returns the PeerInfo for all replicas of a raft node. This is different than node.Peers()
// and is used for external facing advisories.
func (s *Server) replicas(node RaftNode) []*PeerInfo {
now := time.Now()
var replicas []*PeerInfo
for _, rp := range node.Peers() {
pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: rp.Current, Active: now.Sub(rp.Last)}
replicas = append(replicas, pi)
}
return replicas
}

func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignment, isLeader bool) {
js.mu.Lock()
s, account, err := js.srv, sa.Client.Account, sa.err
client, reply := sa.Client, sa.Reply
hasResponded := sa.responded
sa.responded = true
js.mu.Unlock()

stream := mset.Name()

if isLeader {
js.srv.Noticef("JetStream cluster new stream leader for '%s > %s'", sa.Client.Account, mset.Name())
s.Noticef("JetStream cluster new stream leader for '%s > %s'", sa.Client.Account, stream)
if node := mset.raftNode(); node != nil {
s.publishAdvisory(mset.account(), JSAdvisoryStreamLeaderElectedPre+"."+stream, &JSStreamLeaderElectedAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamLeaderElectedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: stream,
Leader: s.serverNameForNode(node.GroupLeader()),
Replicas: s.replicas(node),
})
}
} else {
// We are stepping down. Make sure if we are doing so because we have lost quorum that
// we send the appropriate advisories.
if node := mset.raftNode(); node != nil && !node.Quorum() {
s.Warnf("JetStream cluster stream '%s > %s' has lost quorum, stalled.", sa.Client.Account, stream)
s.publishAdvisory(mset.account(), JSAdvisoryStreamQuorumLostPre+"."+stream, &JSStreamQuorumLostAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamQuorumLostAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: stream,
Replicas: s.replicas(node),
})
}
}

// Tell stream to switch leader status.
mset.setLeader(isLeader)

js.mu.Lock()
if !isLeader || sa.responded {
js.mu.Unlock()
if !isLeader || hasResponded {
return
}
s, account, err := js.srv, sa.Client.Account, sa.err
client, reply := sa.Client, sa.Reply
js.mu.Unlock()

acc, _ := s.LookupAccount(account)
if acc == nil {
Expand Down Expand Up @@ -1973,27 +2017,60 @@ func decodeDeliveredUpdate(buf []byte) (dseq, sseq, dc uint64, ts int64, err err
}

func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssignment, isLeader bool) {
if isLeader {
js.srv.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.Account, ca.Stream, ca.Name)
}

o.setLeader(isLeader)

js.mu.Lock()
if !isLeader || ca.responded {
ca.responded = true
js.mu.Unlock()
return
}
s, account, err := js.srv, ca.Client.Account, ca.err
client, reply := ca.Client, ca.Reply
hasResponded := ca.responded
ca.responded = true
js.mu.Unlock()

stream := o.Stream()
consumer := o.Name()
acc, _ := s.LookupAccount(account)
if acc == nil {
return
}

if isLeader {
s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.Account, stream, consumer)
if node := o.raftNode(); node != nil {
s.publishAdvisory(acc, JSAdvisoryConsumerLeaderElectedPre+"."+stream+"."+consumer, &JSConsumerLeaderElectedAdvisory{
TypedEvent: TypedEvent{
Type: JSConsumerLeaderElectedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: stream,
Consumer: consumer,
Leader: s.serverNameForNode(node.GroupLeader()),
Replicas: s.replicas(node),
})
}
} else {
// We are stepping down. Make sure if we are doing so because we have lost quorum that
// we send the appropriate advisories.
if node := o.raftNode(); node != nil && !node.Quorum() {
s.Warnf("JetStream cluster consumer '%s > %s >%s' has lost quorum, stalled.", ca.Client.Account, stream, consumer)
s.publishAdvisory(acc, JSAdvisoryConsumerQuorumLostPre+"."+stream+"."+consumer, &JSConsumerQuorumLostAdvisory{
TypedEvent: TypedEvent{
Type: JSConsumerQuorumLostAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: stream,
Consumer: consumer,
Replicas: s.replicas(node),
})
}
}

// Tell consumer to switch leader status.
o.setLeader(isLeader)

if !isLeader || hasResponded {
return
}

var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
if err != nil {
resp.Error = jsError(err)
Expand Down
48 changes: 48 additions & 0 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,51 @@ type JSRestoreCompleteAdvisory struct {

// JSRestoreCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory
const JSRestoreCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.restore_complete"

// Clustering specific.

// JSStreamLeaderElectedAdvisoryType is sent when the system elects a leader for a stream.
const JSStreamLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.stream_leader_elected"

// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
type JSStreamLeaderElectedAdvisory struct {
TypedEvent
Stream string `json:"stream"`
Leader string `json:"leader"`
Replicas []*PeerInfo `json:"replicas"`
}

// JSStreamQuorumAdvisoryType is sent when the system detects a clustered stream and
// its consumers are stalled and unable to make progress.
const JSStreamQuorumLostAdvisoryType = "io.nats.jetstream.advisory.v1.stream_quorum_lost"

// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
type JSStreamQuorumLostAdvisory struct {
TypedEvent
Stream string `json:"stream"`
Replicas []*PeerInfo `json:"replicas"`
}

// JSConsumerLeaderElectedAdvisoryType is sent when the system elects a leader for a consumer.
const JSConsumerLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_leader_elected"

// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
type JSConsumerLeaderElectedAdvisory struct {
TypedEvent
Stream string `json:"stream"`
Consumer string `json:"consumer"`
Leader string `json:"leader"`
Replicas []*PeerInfo `json:"replicas"`
}

// JSConsumerQuorumAdvisoryType is sent when the system detects a clustered consumer and
// is stalled and unable to make progress.
const JSConsumerQuorumLostAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_quorum_lost"

// JSConsumerQuorumLostAdvisory indicates that a consumer has lost quorum and is stalled.
type JSConsumerQuorumLostAdvisory struct {
TypedEvent
Stream string `json:"stream"`
Consumer string `json:"consumer"`
Replicas []*PeerInfo `json:"replicas"`
}
28 changes: 25 additions & 3 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type RaftNode interface {
State() RaftState
Size() (entries, bytes uint64)
Leader() bool
Quorum() bool
Current() bool
GroupLeader() string
StepDown() error
Expand Down Expand Up @@ -754,9 +755,6 @@ func (n *raft) Group() string {
func (n *raft) Peers() []*Peer {
n.RLock()
defer n.RUnlock()
if n.state != Leader {
return nil
}

var peers []*Peer
for id, ps := range n.peers {
Expand Down Expand Up @@ -1196,9 +1194,30 @@ func (n *raft) runAsLeader() {
}
}

// Quorum reports the quorum status. Will be called on former leaders.
func (n *raft) Quorum() bool {
n.RLock()
defer n.RUnlock()

now, nc := time.Now().UnixNano(), 1
for _, peer := range n.peers {
if now-peer.ts < int64(lostQuorumInterval) {
nc++
if nc >= n.qn {
return true
}
}
}
return false
}

func (n *raft) lostQuorum() bool {
n.RLock()
defer n.RUnlock()
return n.lostQuorumLocked()
}

func (n *raft) lostQuorumLocked() bool {
now, nc := time.Now().UnixNano(), 1
for _, peer := range n.peers {
if now-peer.ts < int64(lostQuorumInterval) {
Expand Down Expand Up @@ -2190,6 +2209,9 @@ func (n *raft) switchToCandidate() {
defer n.Unlock()
if n.state != Candidate {
n.notice("Switching to candidate")
} else if n.lostQuorumLocked() {
// We signal to the upper layers such that can alert on quorum lost.
n.updateLeadChange(false)
}
// Increment the term.
n.term++
Expand Down
Loading

0 comments on commit 7d21488

Please sign in to comment.