diff --git a/server/consumer.go b/server/consumer.go index 0393c50af5a..7485191b70c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -55,6 +55,8 @@ type ConsumerInfo struct { NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` PushBound bool `json:"push_bound,omitempty"` + // TimeStamp indicates when the info was gathered + TimeStamp time.Time `json:"ts"` } type ConsumerConfig struct { @@ -2394,6 +2396,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { NumRedelivered: len(o.rdc), NumPending: o.checkNumPending(), PushBound: o.isPushMode() && o.active, + TimeStamp: time.Now().UTC(), } // If we are replicated and we are not the leader we need to pull certain data from our store. diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 0786b97d794..99ba35c60ed 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1373,9 +1373,10 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + TimeStamp: time.Now().UTC(), } resp.DidCreate = true s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) @@ -1461,12 +1462,13 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, } resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Domain: s.getOpts().JetStreamDomain, - Mirror: mset.mirrorInfo(), - Sources: mset.sourcesInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Domain: s.getOpts().JetStreamDomain, + Mirror: mset.mirrorInfo(), + Sources: mset.sourcesInfo(), + TimeStamp: time.Now().UTC(), } s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } @@ -1686,12 +1688,13 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s for _, mset := range msets[offset:] { config := mset.config() resp.Streams = append(resp.Streams, &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: config, - Domain: s.getOpts().JetStreamDomain, - Mirror: mset.mirrorInfo(), - Sources: mset.sourcesInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: config, + Domain: s.getOpts().JetStreamDomain, + Mirror: mset.mirrorInfo(), + Sources: mset.sourcesInfo(), + TimeStamp: time.Now().UTC(), }) if len(resp.Streams) >= JSApiListLimit { break @@ -1846,6 +1849,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), Alternates: js.streamAlternates(ci, config.Name), + TimeStamp: time.Now().UTC(), } if clusterWideConsCount > 0 { resp.StreamInfo.State.Consumers = clusterWideConsCount @@ -3455,7 +3459,12 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC s.Warnf("Restore failed for %s for stream '%s > %s' in %v", friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start)) } else { - resp.StreamInfo = &StreamInfo{Created: mset.createdTime(), State: mset.state(), Config: mset.config()} + resp.StreamInfo = &StreamInfo{ + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + TimeStamp: time.Now().UTC(), + } s.Noticef("Completed restore of %s for stream '%s > %s' in %v", friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start)) } @@ -4222,10 +4231,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, // our config and defaults for state and no cluster info. if isMember { resp.ConsumerInfo = &ConsumerInfo{ - Stream: ca.Stream, - Name: ca.Name, - Created: ca.Created, - Config: ca.Config, + Stream: ca.Stream, + Name: ca.Name, + Created: ca.Created, + Config: ca.Config, + TimeStamp: time.Now().UTC(), } s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 683b39401d7..1b21b90711e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2932,12 +2932,13 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } else { resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Cluster: js.clusterInfo(mset.raftGroup()), - Sources: mset.sourcesInfo(), - Mirror: mset.mirrorInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Cluster: js.clusterInfo(mset.raftGroup()), + Sources: mset.sourcesInfo(), + Mirror: mset.mirrorInfo(), + TimeStamp: time.Now().UTC(), } resp.DidCreate = true s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) @@ -3339,12 +3340,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss // Send our response. var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Cluster: js.clusterInfo(mset.raftGroup()), - Mirror: mset.mirrorInfo(), - Sources: mset.sourcesInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Cluster: js.clusterInfo(mset.raftGroup()), + Mirror: mset.mirrorInfo(), + Sources: mset.sourcesInfo(), + TimeStamp: time.Now().UTC(), } s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) @@ -3398,12 +3400,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme if !recovering { var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Cluster: js.clusterInfo(mset.raftGroup()), - Sources: mset.sourcesInfo(), - Mirror: mset.mirrorInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Cluster: js.clusterInfo(mset.raftGroup()), + Sources: mset.sourcesInfo(), + Mirror: mset.mirrorInfo(), + TimeStamp: time.Now().UTC(), } s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } @@ -6264,7 +6267,12 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt for _, sa := range streams { if s.allPeersOffline(sa.Group) { // Place offline onto our results by hand here. - si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)} + si := &StreamInfo{ + Config: *sa.Config, + Created: sa.Created, + Cluster: js.offlineClusterInfo(sa.Group), + TimeStamp: time.Now().UTC(), + } resp.Streams = append(resp.Streams, si) missingNames = append(missingNames, sa.Config.Name) } else { @@ -6410,7 +6418,12 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of for _, ca := range consumers { if s.allPeersOffline(ca.Group) { // Place offline onto our results by hand here. - ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)} + ci := &ConsumerInfo{ + Config: ca.Config, + Created: ca.Created, + Cluster: js.offlineClusterInfo(ca.Group), + TimeStamp: time.Now().UTC(), + } resp.Consumers = append(resp.Consumers, ci) missingNames = append(missingNames, ca.Name) } else { @@ -7993,12 +8006,13 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) { } si := &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: config, - Cluster: js.clusterInfo(mset.raftGroup()), - Sources: mset.sourcesInfo(), - Mirror: mset.mirrorInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: config, + Cluster: js.clusterInfo(mset.raftGroup()), + Sources: mset.sourcesInfo(), + Mirror: mset.mirrorInfo(), + TimeStamp: time.Now().UTC(), } // Check for out of band catchups. diff --git a/server/stream.go b/server/stream.go index baeac014d9c..7d4cb8a7a4f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -136,6 +136,8 @@ type StreamInfo struct { Mirror *StreamSourceInfo `json:"mirror,omitempty"` Sources []*StreamSourceInfo `json:"sources,omitempty"` Alternates []StreamAlternate `json:"alternates,omitempty"` + // TimeStamp indicates when the info was gathered + TimeStamp time.Time `json:"ts"` } type StreamAlternate struct {