From fb1d86d5066ec24b057565c944693c776a478252 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Fri, 5 May 2023 16:40:28 +0200 Subject: [PATCH] Record the stream and consumer info timestamps This records the server time when info for streams and consumers are created so that tools such as the nats cli can calculate time deltas for last ack, last delivered and so forth in the context of the server clock. This will help aleviate problems with client devices experiencing clock jitter that can show up in user interfaces as negative seconds since last ack etc Signed-off-by: R.I.Pienaar --- server/consumer.go | 3 ++ server/jetstream_api.go | 50 +++++++++++++++++----------- server/jetstream_cluster.go | 66 ++++++++++++++++++++++--------------- server/stream.go | 2 ++ 4 files changed, 75 insertions(+), 46 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 0393c50af5..7485191b70 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -55,6 +55,8 @@ type ConsumerInfo struct { NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` PushBound bool `json:"push_bound,omitempty"` + // TimeStamp indicates when the info was gathered + TimeStamp time.Time `json:"ts"` } type ConsumerConfig struct { @@ -2394,6 +2396,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { NumRedelivered: len(o.rdc), NumPending: o.checkNumPending(), PushBound: o.isPushMode() && o.active, + TimeStamp: time.Now().UTC(), } // If we are replicated and we are not the leader we need to pull certain data from our store. diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 0786b97d79..99ba35c60e 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1373,9 +1373,10 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + TimeStamp: time.Now().UTC(), } resp.DidCreate = true s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) @@ -1461,12 +1462,13 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, } resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Domain: s.getOpts().JetStreamDomain, - Mirror: mset.mirrorInfo(), - Sources: mset.sourcesInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Domain: s.getOpts().JetStreamDomain, + Mirror: mset.mirrorInfo(), + Sources: mset.sourcesInfo(), + TimeStamp: time.Now().UTC(), } s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } @@ -1686,12 +1688,13 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s for _, mset := range msets[offset:] { config := mset.config() resp.Streams = append(resp.Streams, &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: config, - Domain: s.getOpts().JetStreamDomain, - Mirror: mset.mirrorInfo(), - Sources: mset.sourcesInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: config, + Domain: s.getOpts().JetStreamDomain, + Mirror: mset.mirrorInfo(), + Sources: mset.sourcesInfo(), + TimeStamp: time.Now().UTC(), }) if len(resp.Streams) >= JSApiListLimit { break @@ -1846,6 +1849,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), Alternates: js.streamAlternates(ci, config.Name), + TimeStamp: time.Now().UTC(), } if clusterWideConsCount > 0 { resp.StreamInfo.State.Consumers = clusterWideConsCount @@ -3455,7 +3459,12 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC s.Warnf("Restore failed for %s for stream '%s > %s' in %v", friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start)) } else { - resp.StreamInfo = &StreamInfo{Created: mset.createdTime(), State: mset.state(), Config: mset.config()} + resp.StreamInfo = &StreamInfo{ + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + TimeStamp: time.Now().UTC(), + } s.Noticef("Completed restore of %s for stream '%s > %s' in %v", friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start)) } @@ -4222,10 +4231,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, // our config and defaults for state and no cluster info. if isMember { resp.ConsumerInfo = &ConsumerInfo{ - Stream: ca.Stream, - Name: ca.Name, - Created: ca.Created, - Config: ca.Config, + Stream: ca.Stream, + Name: ca.Name, + Created: ca.Created, + Config: ca.Config, + TimeStamp: time.Now().UTC(), } s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 683b39401d..1b21b90711 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2932,12 +2932,13 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } else { resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Cluster: js.clusterInfo(mset.raftGroup()), - Sources: mset.sourcesInfo(), - Mirror: mset.mirrorInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Cluster: js.clusterInfo(mset.raftGroup()), + Sources: mset.sourcesInfo(), + Mirror: mset.mirrorInfo(), + TimeStamp: time.Now().UTC(), } resp.DidCreate = true s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) @@ -3339,12 +3340,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss // Send our response. var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Cluster: js.clusterInfo(mset.raftGroup()), - Mirror: mset.mirrorInfo(), - Sources: mset.sourcesInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Cluster: js.clusterInfo(mset.raftGroup()), + Mirror: mset.mirrorInfo(), + Sources: mset.sourcesInfo(), + TimeStamp: time.Now().UTC(), } s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) @@ -3398,12 +3400,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme if !recovering { var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Cluster: js.clusterInfo(mset.raftGroup()), - Sources: mset.sourcesInfo(), - Mirror: mset.mirrorInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Cluster: js.clusterInfo(mset.raftGroup()), + Sources: mset.sourcesInfo(), + Mirror: mset.mirrorInfo(), + TimeStamp: time.Now().UTC(), } s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } @@ -6264,7 +6267,12 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt for _, sa := range streams { if s.allPeersOffline(sa.Group) { // Place offline onto our results by hand here. - si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)} + si := &StreamInfo{ + Config: *sa.Config, + Created: sa.Created, + Cluster: js.offlineClusterInfo(sa.Group), + TimeStamp: time.Now().UTC(), + } resp.Streams = append(resp.Streams, si) missingNames = append(missingNames, sa.Config.Name) } else { @@ -6410,7 +6418,12 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of for _, ca := range consumers { if s.allPeersOffline(ca.Group) { // Place offline onto our results by hand here. - ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)} + ci := &ConsumerInfo{ + Config: ca.Config, + Created: ca.Created, + Cluster: js.offlineClusterInfo(ca.Group), + TimeStamp: time.Now().UTC(), + } resp.Consumers = append(resp.Consumers, ci) missingNames = append(missingNames, ca.Name) } else { @@ -7993,12 +8006,13 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) { } si := &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: config, - Cluster: js.clusterInfo(mset.raftGroup()), - Sources: mset.sourcesInfo(), - Mirror: mset.mirrorInfo(), + Created: mset.createdTime(), + State: mset.state(), + Config: config, + Cluster: js.clusterInfo(mset.raftGroup()), + Sources: mset.sourcesInfo(), + Mirror: mset.mirrorInfo(), + TimeStamp: time.Now().UTC(), } // Check for out of band catchups. diff --git a/server/stream.go b/server/stream.go index baeac014d9..7d4cb8a7a4 100644 --- a/server/stream.go +++ b/server/stream.go @@ -136,6 +136,8 @@ type StreamInfo struct { Mirror *StreamSourceInfo `json:"mirror,omitempty"` Sources []*StreamSourceInfo `json:"sources,omitempty"` Alternates []StreamAlternate `json:"alternates,omitempty"` + // TimeStamp indicates when the info was gathered + TimeStamp time.Time `json:"ts"` } type StreamAlternate struct {