diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4c2a7a1ddf..65d8aa6700 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3010,7 +3010,7 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo { for _, rp := range node.Peers() { if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { si := sir.(nodeInfo) - pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} + pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Observer: rp.Observer, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} replicas = append(replicas, pi) } } @@ -4006,7 +4006,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Select a new peer to transfer to. If we are a migrating make sure its from the new cluster. var npeer string for _, r := range peers { - if !r.Current { + if !r.Current { // TODO(nat): Should this check r.Observer? continue } if !migrating { @@ -8176,11 +8176,12 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { // yet (which can happen after the whole cluster is stopped and only some // of the nodes are restarted). pi := &PeerInfo{ - Current: current, - Offline: true, - Active: lastSeen, - Lag: rp.Lag, - Peer: rp.ID, + Current: current, + Observer: rp.Observer, + Offline: true, + Active: lastSeen, + Lag: rp.Lag, + Peer: rp.ID, } // If node is found, complete/update the settings. if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { diff --git a/server/raft.go b/server/raft.go index f7a0535372..ab1ec0c986 100644 --- a/server/raft.go +++ b/server/raft.go @@ -90,10 +90,11 @@ type WAL interface { } type Peer struct { - ID string - Current bool - Last time.Time - Lag uint64 + ID string + Current bool + Observer bool + Last time.Time + Lag uint64 } type RaftState uint8 @@ -1463,10 +1464,11 @@ func (n *raft) Peers() []*Peer { lag = n.commit - ps.li } p := &Peer{ - ID: id, - Current: id == n.leader || ps.li >= n.applied, - Last: time.Unix(0, ps.ts), - Lag: lag, + ID: id, + Current: id == n.leader || ps.li >= n.applied, + Observer: n.observer, + Last: time.Unix(0, ps.ts), + Lag: lag, } peers = append(peers, p) } diff --git a/server/stream.go b/server/stream.go index b907e8de5d..dc6e26b463 100644 --- a/server/stream.go +++ b/server/stream.go @@ -168,12 +168,13 @@ type ClusterInfo struct { // PeerInfo shows information about all the peers in the cluster that // are supporting the stream or consumer. type PeerInfo struct { - Name string `json:"name"` - Current bool `json:"current"` - Offline bool `json:"offline,omitempty"` - Active time.Duration `json:"active"` - Lag uint64 `json:"lag,omitempty"` - Peer string `json:"peer"` + Name string `json:"name"` + Current bool `json:"current"` + Observer bool `json:"observer,omitempty"` + Offline bool `json:"offline,omitempty"` + Active time.Duration `json:"active"` + Lag uint64 `json:"lag,omitempty"` + Peer string `json:"peer"` // For migrations. cluster string }