Skip to content

Commit

Permalink
Report observer status in asset replica info
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 27, 2023
1 parent 1700f56 commit 38d0bbe
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
15 changes: 8 additions & 7 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3010,7 +3010,7 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo {
for _, rp := range node.Peers() {
if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil {
si := sir.(nodeInfo)
pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag}
pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Observer: rp.Observer, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag}
replicas = append(replicas, pi)
}
}
Expand Down Expand Up @@ -4006,7 +4006,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// Select a new peer to transfer to. If we are a migrating make sure its from the new cluster.
var npeer string
for _, r := range peers {
if !r.Current {
if !r.Current { // TODO(nat): Should this check r.Observer?
continue
}
if !migrating {
Expand Down Expand Up @@ -8176,11 +8176,12 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
// yet (which can happen after the whole cluster is stopped and only some
// of the nodes are restarted).
pi := &PeerInfo{
Current: current,
Offline: true,
Active: lastSeen,
Lag: rp.Lag,
Peer: rp.ID,
Current: current,
Observer: rp.Observer,
Offline: true,
Active: lastSeen,
Lag: rp.Lag,
Peer: rp.ID,
}
// If node is found, complete/update the settings.
if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil {
Expand Down
18 changes: 10 additions & 8 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ type WAL interface {
}

type Peer struct {
ID string
Current bool
Last time.Time
Lag uint64
ID string
Current bool
Observer bool
Last time.Time
Lag uint64
}

type RaftState uint8
Expand Down Expand Up @@ -1463,10 +1464,11 @@ func (n *raft) Peers() []*Peer {
lag = n.commit - ps.li
}
p := &Peer{
ID: id,
Current: id == n.leader || ps.li >= n.applied,
Last: time.Unix(0, ps.ts),
Lag: lag,
ID: id,
Current: id == n.leader || ps.li >= n.applied,
Observer: n.observer,
Last: time.Unix(0, ps.ts),
Lag: lag,
}
peers = append(peers, p)
}
Expand Down
13 changes: 7 additions & 6 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ type ClusterInfo struct {
// PeerInfo shows information about all the peers in the cluster that
// are supporting the stream or consumer.
type PeerInfo struct {
Name string `json:"name"`
Current bool `json:"current"`
Offline bool `json:"offline,omitempty"`
Active time.Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
Peer string `json:"peer"`
Name string `json:"name"`
Current bool `json:"current"`
Observer bool `json:"observer,omitempty"`
Offline bool `json:"offline,omitempty"`
Active time.Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
Peer string `json:"peer"`
// For migrations.
cluster string
}
Expand Down

0 comments on commit 38d0bbe

Please sign in to comment.