Skip to content

Commit

Permalink
fix: improve NTP sync process
Browse files Browse the repository at this point in the history
Fixes #4425

* add more logging for responses and sync process
* adjust time sync constants
* change the way poll interval is chosen (increasing on good sync,
decreasing on variation)
* filter out spikes

Based on flow in https://github.com/systemd/systemd/blob/main/src/timesync/timesyncd-manager.c

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Nov 11, 2021
1 parent 7efc123 commit a2233bf
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 21 deletions.
15 changes: 11 additions & 4 deletions internal/pkg/ntp/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ package ntp
import "time"

const (
// MaxAllowablePoll is the 'recommended' interval for querying a time server.
MaxAllowablePoll = 1024 * time.Second
// MinAllowablePoll is the minimum time allowed for a client to query a time server.
MinAllowablePoll = 4 * time.Second
MinAllowablePoll = 32 * time.Second
// MaxAllowablePoll is the maximum allowed interval for querying a time server.
MaxAllowablePoll = 2048 * time.Second
// RetryPoll is the interval between retries if the error is not Kiss-o-Death.
RetryPoll = time.Second
// AdjustTimeLimit is a maximum time drift to compensate via adjtimex().
AdjustTimeLimit = 128 * time.Millisecond
//
// Deltas smaller than AdjustTimeLimit are gradually adjusted (slewed) to approach the network time.
// Deltas larger than AdjustTimeLimit are set by letting the system time jump.
AdjustTimeLimit = 400 * time.Millisecond
// EpochLimit is a minimum time difference to signal that change as epoch change.
EpochLimit = 15 * time.Minute
// ExpectedAccuracy is the expected time sync accuracy, used to adjust poll interval.
ExpectedAccuracy = 200 * time.Millisecond
)
156 changes: 142 additions & 14 deletions internal/pkg/ntp/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"bytes"
"context"
"fmt"
"math"
"math/bits"
"math/rand"
"net"
"reflect"
"sync"
Expand Down Expand Up @@ -41,14 +41,25 @@ type Syncer struct {

firstSync bool

MinPoll, MaxPoll time.Duration
packetCount int64
samples []sample
samplesIdx int
samplesJitter float64

MinPoll, MaxPoll, RetryPoll time.Duration

// these functions are overridden in tests for mocking support
CurrentTime CurrentTimeFunc
NTPQuery QueryFunc
AdjustTime AdjustTimeFunc
}

const sampleCount = 8

type sample struct {
offset, rtt float64 // in seconds
}

// NewSyncer creates new Syncer with default configuration.
func NewSyncer(logger *zap.Logger, timeServers []string) *Syncer {
syncer := &Syncer{
Expand All @@ -62,8 +73,11 @@ func NewSyncer(logger *zap.Logger, timeServers []string) *Syncer {

firstSync: true,

MinPoll: MinAllowablePoll,
MaxPoll: MaxAllowablePoll,
samples: make([]sample, sampleCount),

MinPoll: MinAllowablePoll,
MaxPoll: MaxAllowablePoll,
RetryPoll: RetryPoll,

CurrentTime: time.Now,
NTPQuery: ntp.Query,
Expand Down Expand Up @@ -126,12 +140,75 @@ func (syncer *Syncer) restartSync() {
}
}

func absDuration(d time.Duration) time.Duration {
if d < 0 {
return -d
}

return d
}

func (syncer *Syncer) spikeDetector(resp *ntp.Response) bool {
syncer.packetCount++

if syncer.packetCount == 1 {
// ignore first packet
return false
}

var currentIndex int

currentIndex, syncer.samplesIdx = syncer.samplesIdx, (syncer.samplesIdx+1)%sampleCount

syncer.samples[syncer.samplesIdx].offset = resp.ClockOffset.Seconds()
syncer.samples[syncer.samplesIdx].rtt = resp.RTT.Seconds()

jitter := syncer.samplesJitter

indexMin := currentIndex

for i := range syncer.samples {
if syncer.samples[i].rtt == 0 {
continue
}

if syncer.samples[i].rtt < syncer.samples[indexMin].rtt {
indexMin = i
}
}

var j float64

for i := range syncer.samples {
j += math.Pow(syncer.samples[i].offset-syncer.samples[indexMin].offset, 2)
}

syncer.samplesJitter = math.Sqrt(j / (sampleCount - 1))

if absDuration(resp.ClockOffset) > resp.RTT {
// always accept clock offset if that is larger than rtt
return false
}

if syncer.packetCount < 4 {
// need more samples to make a decision
return false
}

if absDuration(resp.ClockOffset).Seconds() > syncer.samples[indexMin].rtt {
// do not accept anything worse than the maximum possible error of the best sample
return true
}

return math.Abs(resp.ClockOffset.Seconds()-syncer.samples[currentIndex].offset) > 3*jitter
}

// Run runs the sync process.
//
// Run is usually run in a goroutine.
// When context is canceled, sync process aborts.
//
//nolint:gocyclo
//nolint:gocyclo,cyclop
func (syncer *Syncer) Run(ctx context.Context) {
RTCClockInitialize.Do(func() {
var err error
Expand All @@ -142,24 +219,57 @@ func (syncer *Syncer) Run(ctx context.Context) {
}
})

pollInterval := time.Duration(0)

for {
lastSyncServer, resp, err := syncer.query(ctx)
if err != nil {
return
}

// Set some variance with how frequently we poll ntp servers.
// This is based on rand(MaxPoll) + MinPoll so we wait at least
// MinPoll.
nextPollInterval := time.Duration(rand.Intn(int(syncer.MaxPoll.Seconds())))*time.Second + syncer.MinPoll
spike := false

if resp != nil && resp.Validate() == nil {
spike = syncer.spikeDetector(resp)
}

if resp == nil {
switch {
case resp == nil:
// if no response was ever received, consider doing short sleep to retry sooner as it's not Kiss-o-Death response
nextPollInterval = syncer.MinPoll / 2
pollInterval = syncer.RetryPoll
case pollInterval == 0:
// first sync
pollInterval = syncer.MinPoll
case err != nil:
// error encountered, don't change the poll interval
case !spike && absDuration(resp.ClockOffset) > ExpectedAccuracy:
// huge offset, retry sync with minimum interval
pollInterval = syncer.MinPoll
case absDuration(resp.ClockOffset) < ExpectedAccuracy*100/25: // *0.25
// clock offset is within 25% of expected accuracy, increase poll interval
if pollInterval < syncer.MaxPoll {
pollInterval *= 2
}
case spike || absDuration(resp.ClockOffset) > ExpectedAccuracy*100/75: // *0.75
// spike was detected or clock offset is too large, decrease poll interval
if pollInterval > syncer.MinPoll {
pollInterval /= 2
}
}

if resp != nil && resp.Validate() == nil {
err = syncer.adjustTime(resp.ClockOffset, resp.Leap, lastSyncServer, nextPollInterval)
if resp != nil && pollInterval < syncer.MinPoll {
// set poll interval to at least min poll if there was any response
pollInterval = syncer.MinPoll
}

syncer.logger.Debug("sample stats",
zap.Duration("jitter", time.Duration(syncer.samplesJitter*float64(time.Second))),
zap.Duration("poll_interval", pollInterval),
zap.Bool("spike", spike),
)

if resp != nil && resp.Validate() == nil && !spike {
err = syncer.adjustTime(resp.ClockOffset, resp.Leap, lastSyncServer, pollInterval)

if err == nil {
if !syncer.timeSyncNotified {
Expand All @@ -178,7 +288,7 @@ func (syncer *Syncer) Run(ctx context.Context) {
return
case <-syncer.restartSyncCh:
// time servers got changed, restart the loop immediately
case <-time.After(nextPollInterval):
case <-time.After(pollInterval):
}
}
}
Expand Down Expand Up @@ -265,6 +375,17 @@ func (syncer *Syncer) queryServer(server string) (*ntp.Response, error) {
return nil, err
}

syncer.logger.Debug("NTP response",
zap.Duration("clock_offset", resp.ClockOffset),
zap.Duration("rtt", resp.RTT),
zap.Uint8("leap", uint8(resp.Leap)),
zap.Uint8("stratum", resp.Stratum),
zap.Duration("precision", resp.Precision),
zap.Duration("root_delay", resp.RootDelay),
zap.Duration("root_dispersion", resp.RootDispersion),
zap.Duration("root_distance", resp.RootDistance),
)

if err = resp.Validate(); err != nil {
return resp, err
}
Expand Down Expand Up @@ -352,6 +473,13 @@ func (syncer *Syncer) adjustTime(offset time.Duration, leapSecond ntp.LeapIndica
ce.Write()
}

syncer.logger.Debug("adjtime state",
zap.Int64("constant", req.Constant),
zap.Duration("offset", time.Duration(req.Offset)),
zap.Int64("freq_offset", req.Freq),
zap.Int64("freq_offset_ppm", req.Freq/65536),
)

if err == nil {
if offset < -EpochLimit || offset > EpochLimit {
// notify about epoch change
Expand Down

0 comments on commit a2233bf

Please sign in to comment.