Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utc time #1943

Merged
merged 1 commit into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3153,7 +3153,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.jsLimits = nil
}

a.updated = time.Now()
a.updated = time.Now().UTC()
a.mu.Unlock()

clients := gatherClients()
Expand Down
6 changes: 3 additions & 3 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ func (c *client) writeLoop() {
// will normally be called in the readLoop of the client who sent the
// message that now is being delivered.
func (c *client) flushClients(budget time.Duration) time.Time {
last := time.Now()
last := time.Now().UTC()

// Check pending clients for flush.
for cp := range c.pcd {
Expand Down Expand Up @@ -1615,7 +1615,7 @@ func (c *client) processConnect(arg []byte) error {
c.mu.Unlock()
return nil
}
c.last = time.Now()
c.last = time.Now().UTC()
// Estimate RTT to start.
if c.kind == CLIENT {
c.rtt = computeRTT(c.start)
Expand Down Expand Up @@ -2013,7 +2013,7 @@ func (c *client) sendRTTPingLocked() bool {

// Assume the lock is held upon entry.
func (c *client) sendPing() {
c.rttStart = time.Now()
c.rttStart = time.Now().UTC()
c.ping.out++
if c.trace {
c.traceOutOp("PING", nil)
Expand Down
3 changes: 1 addition & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,6 @@ func (o *consumer) config() ConsumerConfig {
// Force expiration of all pending.
// Lock should be held.
func (o *consumer) forceExpirePending() {
now := time.Now().UnixNano()
var expired []uint64
for seq := range o.pending {
if !o.onRedeliverQueue(seq) {
Expand All @@ -958,7 +957,7 @@ func (o *consumer) forceExpirePending() {
o.rdq = append(o.rdq, expired...)
// Now we should update the timestamp here since we are redelivering.
// We will use an incrementing time to preserve order for any other redelivery.
off := now - o.pending[expired[0]].Timestamp
off := time.Now().UnixNano() - o.pending[expired[0]].Timestamp
for _, seq := range expired {
if p, ok := o.pending[seq]; ok && p != nil {
p.Timestamp += off
Expand Down
8 changes: 4 additions & 4 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ RESET:
pm.si.ID = id
pm.si.Seq = atomic.AddUint64(seqp, 1)
pm.si.Version = VERSION
pm.si.Time = time.Now()
pm.si.Time = time.Now().UTC()
pm.si.JetStream = js
}
var b []byte
Expand Down Expand Up @@ -1434,7 +1434,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
TypedEvent: TypedEvent{
Type: DisconnectEventMsgType,
ID: eid,
Time: now.UTC(),
Time: now,
},
Client: ClientInfo{
Start: &c.start,
Expand Down Expand Up @@ -1477,13 +1477,13 @@ func (s *Server) sendAuthErrorEvent(c *client) {
}
eid := s.nextEventID()
s.mu.Unlock()
now := time.Now()
now := time.Now().UTC()
c.mu.Lock()
m := DisconnectEventMsg{
TypedEvent: TypedEvent{
Type: DisconnectEventMsgType,
ID: eid,
Time: now.UTC(),
Time: now,
},
Client: ClientInfo{
Start: &c.start,
Expand Down
4 changes: 2 additions & 2 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,7 @@ func TestSystemAccountWithGateways(t *testing.T) {
}
func TestServerEventsStatsZ(t *testing.T) {
serverStatsReqSubj := "$SYS.REQ.SERVER.%s.STATSZ"
preStart := time.Now()
preStart := time.Now().UTC()
// Add little bit of delay to make sure that time check
// between pre-start and actual start does not fail.
time.Sleep(5 * time.Millisecond)
Expand All @@ -1644,7 +1644,7 @@ func TestServerEventsStatsZ(t *testing.T) {
defer sb.Shutdown()
// Same between actual start and post start.
time.Sleep(5 * time.Millisecond)
postStart := time.Now()
postStart := time.Now().UTC()

url := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
ncs, err := nats.Connect(url, createUserCreds(t, sa, akp))
Expand Down
6 changes: 2 additions & 4 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ const (
)

func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
return newFileStoreWithCreated(fcfg, cfg, time.Now())
return newFileStoreWithCreated(fcfg, cfg, time.Now().UTC())
}

func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time) (*fileStore, error) {
Expand Down Expand Up @@ -3440,8 +3440,6 @@ func encodeConsumerState(state *ConsumerState) []byte {
buf = make([]byte, maxSize)
}

now := time.Now()

// Write header
buf[0] = magic
buf[1] = 2
Expand All @@ -3459,7 +3457,7 @@ func encodeConsumerState(state *ConsumerState) []byte {
// These are optional, but always write len. This is to avoid a truncate inline.
if len(state.Pending) > 0 {
// To save space we will use now rounded to seconds to be base timestamp.
mints := now.Round(time.Second).Unix()
mints := time.Now().Round(time.Second).Unix()
// Write minimum timestamp we found from above.
n += binary.PutVarint(buf[n:], mints)

Expand Down
2 changes: 1 addition & 1 deletion server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
// Snapshot server options.
opts := s.getOpts()

now := time.Now()
now := time.Now().UTC()
c := &client{srv: s, nc: conn, start: now, last: now, kind: GATEWAY}

// Are we creating the gateway based on the configuration
Expand Down
24 changes: 12 additions & 12 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2203,7 +2203,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
Sequence: req.Seq,
Header: hdr,
Data: msg,
Time: time.Unix(0, ts),
Time: time.Unix(0, ts).UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
Expand Down Expand Up @@ -2368,13 +2368,13 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
streamName := cfg.Name
s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)

start := time.Now()
start := time.Now().UTC()

s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
TypedEvent: TypedEvent{
Type: JSRestoreCreateAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
Time: start,
},
Stream: streamName,
Client: ci,
Expand Down Expand Up @@ -2481,18 +2481,18 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
s.Warnf(errStr)
}

end := time.Now()
end := time.Now().UTC()

// TODO(rip) - Should this have the error code in it??
s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
TypedEvent: TypedEvent{
Type: JSRestoreCompleteAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
Time: end,
},
Stream: streamName,
Start: start.UTC(),
End: end.UTC(),
Start: start,
End: end,
Bytes: int64(total),
Client: ci,
})
Expand Down Expand Up @@ -2587,7 +2587,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
}

start := time.Now()
start := time.Now().UTC()

sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
if err != nil {
Expand Down Expand Up @@ -2617,17 +2617,17 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
// Now do the real streaming.
s.streamSnapshot(ci, acc, mset, sr, &req)

end := time.Now()
end := time.Now().UTC()

s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
TypedEvent: TypedEvent{
Type: JSSnapshotCompleteAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
Time: end,
},
Stream: mset.name(),
Start: start.UTC(),
End: end.UTC(),
Start: start,
End: end,
Client: ci,
})

Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3079,7 +3079,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
// Pick a preferred leader.
rg.setPreferred()
// Sync subject for post snapshot sync.
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now()}
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
cc.meta.Propose(encodeAddStreamAssignment(sa))
}

Expand Down Expand Up @@ -3218,7 +3218,7 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r
}
// Pick a preferred leader.
rg.setPreferred()
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now()}
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
// Now add in our restore state and pre-select a peer to handle the actual receipt of the snapshot.
sa.Restore = &req.State
cc.meta.Propose(encodeAddStreamAssignment(sa))
Expand Down
13 changes: 13 additions & 0 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2020-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func TestJetStreamConsumerWithStartTime(t *testing.T) {
}

time.Sleep(10 * time.Millisecond)
startTime := time.Now()
startTime := time.Now().UTC()

for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, subj, fmt.Sprintf("MSG: %d", i+1))
Expand Down Expand Up @@ -1181,7 +1181,7 @@ func TestJetStreamCreateConsumer(t *testing.T) {
}

// StartPosition conflicts
now := time.Now()
now := time.Now().UTC()
if _, err := mset.addConsumer(&ConsumerConfig{
DeliverSubject: "A",
OptStartSeq: 1,
Expand Down Expand Up @@ -10534,7 +10534,7 @@ func TestJetStreamMirrorBasics(t *testing.T) {
})

// Make sure setting time works ok.
start := time.Now().Add(-2 * time.Hour)
start := time.Now().UTC().Add(-2 * time.Hour)
cfg = &StreamConfig{
Name: "M4",
Storage: FileStorage,
Expand Down
4 changes: 2 additions & 2 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func TestJWTAccountRenewFromResolver(t *testing.T) {
addAccountToMemResolver(s, apub, ajwt)
// Make sure the too quick update suppression does not bite us.
acc.mu.Lock()
acc.updated = time.Now().Add(-1 * time.Hour)
acc.updated = time.Now().UTC().Add(-1 * time.Hour)
acc.mu.Unlock()

// Do not update the account directly. The resolver should
Expand Down Expand Up @@ -2964,7 +2964,7 @@ func TestJWTAccountLimitsMaxConnsAfterExpired(t *testing.T) {
acc, _ := s.LookupAccount(fooPub)
acc.mu.Lock()
acc.expired = true
acc.updated = time.Now().Add(-2 * time.Second) // work around updating to quickly
acc.updated = time.Now().UTC().Add(-2 * time.Second) // work around updating to quickly
acc.mu.Unlock()

// Now update with new expiration and max connections lowered to 2
Expand Down
2 changes: 1 addition & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
if maxSubs == 0 {
maxSubs = -1
}
now := time.Now()
now := time.Now().UTC()

c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
// Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
Expand Down
16 changes: 8 additions & 8 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
c := &Connz{
Offset: offset,
Limit: limit,
Now: time.Now(),
Now: time.Now().UTC(),
}

// Open clients
Expand Down Expand Up @@ -678,7 +678,7 @@ type RouteInfo struct {
// Routez returns a Routez struct containing information about routes.
func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
rs := &Routez{Routes: []*RouteInfo{}}
rs.Now = time.Now()
rs.Now = time.Now().UTC()

if routezOpts == nil {
routezOpts = &RoutezOptions{}
Expand Down Expand Up @@ -859,7 +859,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
slStats := &SublistStats{}

// FIXME(dlc) - Make account aware.
sz := &Subsz{s.info.ID, time.Now(), slStats, 0, offset, limit, nil}
sz := &Subsz{s.info.ID, time.Now().UTC(), slStats, 0, offset, limit, nil}

if subdetail {
var raw [4096]*subscription
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
// is done.
// Server lock is held on entry.
func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64, rss int64) {
v.Now = time.Now()
v.Now = time.Now().UTC()
v.Uptime = myUptime(time.Since(s.start))
v.Mem = rss
v.CPU = pcpu
Expand Down Expand Up @@ -1491,7 +1491,7 @@ type AccountGatewayz struct {
// Gatewayz returns a Gatewayz struct containing information about gateways.
func (s *Server) Gatewayz(opts *GatewayzOptions) (*Gatewayz, error) {
srvID := s.ID()
now := time.Now()
now := time.Now().UTC()
gw := s.gateway
gw.RLock()
if !gw.enabled {
Expand Down Expand Up @@ -1837,7 +1837,7 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) {
}
return &Leafz{
ID: s.ID(),
Now: time.Now(),
Now: time.Now().UTC(),
NumLeafs: len(leafnodes),
Leafs: leafnodes,
}, nil
Expand Down Expand Up @@ -2045,7 +2045,7 @@ func (s *Server) HandleAccountz(w http.ResponseWriter, r *http.Request) {
func (s *Server) Accountz(optz *AccountzOptions) (*Accountz, error) {
a := &Accountz{
ID: s.ID(),
Now: time.Now(),
Now: time.Now().UTC(),
}
if sacc := s.SystemAccount(); sacc != nil {
a.SystemAccount = sacc.GetName()
Expand Down Expand Up @@ -2389,7 +2389,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
}
jsi := &JSInfo{
ID: s.ID(),
Now: time.Now(),
Now: time.Now().UTC(),
}
if !s.JetStreamEnabled() {
jsi.Disabled = true
Expand Down
2 changes: 1 addition & 1 deletion server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ func TestConnzSortedByStopTimeClosedConn(t *testing.T) {

//Now adjust the Stop times for these with some random values.
s.mu.Lock()
now := time.Now()
now := time.Now().UTC()
ccs := s.closed.closedClients()
for _, cc := range ccs {
newStop := now.Add(time.Duration(rand.Int()%120) * -time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (s *Server) createMQTTClient(conn net.Conn) *client {
if maxSubs == 0 {
maxSubs = -1
}
now := time.Now()
now := time.Now().UTC()

c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: &mqtt{}}
c.headers = true
Expand Down
Loading