Skip to content

Commit

Permalink
Made poll interval limits and initial value configurable. (#347)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 committed Oct 20, 2022
1 parent cc575ba commit af9c17f
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 125 deletions.
10 changes: 5 additions & 5 deletions ntp-daemon/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mod tests {

use ntp_proto::{
NtpDuration, NtpInstant, NtpLeapIndicator, NtpTimestamp, PeerSnapshot, PeerStatistics,
PollInterval, Reach, ReferenceId,
PollInterval, PollIntervalLimits, Reach, ReferenceId,
};
use tokio::{io::AsyncReadExt, net::UnixStream};

Expand Down Expand Up @@ -142,7 +142,7 @@ mod tests {
time: NtpInstant::now(),
stratum: 2,
peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
poll_interval: PollInterval::MAX,
poll_interval: PollIntervalLimits::default().max,
reference_id: ReferenceId::from_ip("127.0.0.3".parse().unwrap()),
our_id: ReferenceId::from_ip("127.0.0.2".parse().unwrap()),
reach: Reach::default(),
Expand Down Expand Up @@ -171,7 +171,7 @@ mod tests {
)));

let system_reader = Arc::new(tokio::sync::RwLock::new(SystemSnapshot {
poll_interval: PollInterval::MIN,
poll_interval: PollIntervalLimits::default().min,
stratum: 1,
precision: NtpDuration::from_seconds(1e-3),
root_delay: NtpDuration::ZERO,
Expand Down Expand Up @@ -229,7 +229,7 @@ mod tests {
time: NtpInstant::now(),
stratum: 2,
peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
poll_interval: PollInterval::MAX,
poll_interval: PollIntervalLimits::default().max,
reference_id: ReferenceId::from_ip("127.0.0.3".parse().unwrap()),
our_id: ReferenceId::from_ip("127.0.0.2".parse().unwrap()),
reach: Reach::default(),
Expand Down Expand Up @@ -260,7 +260,7 @@ mod tests {
let peers_writer = peers_reader.clone();

let system_reader = Arc::new(tokio::sync::RwLock::new(SystemSnapshot {
poll_interval: PollInterval::MIN,
poll_interval: PollIntervalLimits::default().min,
stratum: 1,
precision: NtpDuration::from_seconds(1e-3),
reference_id: ReferenceId::NONE,
Expand Down
22 changes: 16 additions & 6 deletions ntp-daemon/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ where

async fn handle_poll(&mut self, poll_wait: &mut Pin<&mut T>) -> PollResult {
let system_snapshot = *self.channels.system_snapshots.read().await;
let packet = self.peer.generate_poll_message(system_snapshot);
let config_snapshot = *self.channels.system_config.read().await;
let packet = self
.peer
.generate_poll_message(system_snapshot, &config_snapshot);

// Sent a poll, so update waiting to match deadline of next
self.last_poll_sent = Instant::now();
Expand Down Expand Up @@ -196,11 +199,12 @@ where
let ntp_instant = NtpInstant::now();

let system_snapshot = *self.channels.system_snapshots.read().await;
let system_config = *self.channels.system_config.read().await;
let result = self.peer.handle_incoming(
system_snapshot,
&system_config,
packet,
ntp_instant,
self.channels.system_config.read().await.frequency_tolerance,
send_timestamp,
recv_timestamp,
);
Expand Down Expand Up @@ -326,7 +330,8 @@ where
let peer_id = ReferenceId::from_ip(socket.as_ref().peer_addr().unwrap().ip());

let local_clock_time = NtpInstant::now();
let peer = Peer::new(our_id, peer_id, local_clock_time);
let config_snapshot = *channels.system_config.read().await;
let peer = Peer::new(our_id, peer_id, local_clock_time, &config_snapshot);

let poll_wait = tokio::time::sleep(std::time::Duration::default());
tokio::pin!(poll_wait);
Expand Down Expand Up @@ -552,14 +557,19 @@ mod tests {
let our_id = ReferenceId::from_ip(socket.as_ref().local_addr().unwrap().ip());
let peer_id = ReferenceId::from_ip(socket.as_ref().peer_addr().unwrap().ip());

let local_clock_time = NtpInstant::now();
let peer = Peer::new(our_id, peer_id, local_clock_time);

let system_snapshots = Arc::new(RwLock::new(SystemSnapshot::default()));
let system_config = Arc::new(RwLock::new(SystemConfig::default()));
let (msg_for_system_sender, msg_for_system_receiver) = mpsc::channel(1);
let (reset_send, reset) = watch::channel(ResetEpoch::default());

let local_clock_time = NtpInstant::now();
let peer = Peer::new(
our_id,
peer_id,
local_clock_time,
&*system_config.read().await,
);

let process = PeerTask {
_wait: PhantomData,
index: PeerIndex::from_inner(0),
Expand Down
22 changes: 11 additions & 11 deletions ntp-daemon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl<T: std::hash::Hash + Eq> TimestampedCache<T> {
mod tests {
use std::time::Duration;

use ntp_proto::{NtpDuration, NtpLeapIndicator, PollInterval, ReferenceId};
use ntp_proto::{NtpDuration, NtpLeapIndicator, PollInterval, PollIntervalLimits, ReferenceId};

use crate::ipfilter::IpFilter;

Expand Down Expand Up @@ -355,7 +355,7 @@ mod tests {
)
.await
.unwrap();
let (packet, id) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min);

socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
Expand Down Expand Up @@ -392,7 +392,7 @@ mod tests {
)
.await
.unwrap();
let (packet, id) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min);

socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
Expand Down Expand Up @@ -430,7 +430,7 @@ mod tests {
)
.await
.unwrap();
let (packet, _) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, _) = NtpPacket::poll_message(PollIntervalLimits::default().min);

socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
Expand Down Expand Up @@ -462,7 +462,7 @@ mod tests {
)
.await
.unwrap();
let (packet, id) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min);

socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
Expand Down Expand Up @@ -499,7 +499,7 @@ mod tests {
)
.await
.unwrap();
let (packet, id) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min);

socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
Expand Down Expand Up @@ -537,7 +537,7 @@ mod tests {
)
.await
.unwrap();
let (packet, _) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, _) = NtpPacket::poll_message(PollIntervalLimits::default().min);

socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
Expand Down Expand Up @@ -570,7 +570,7 @@ mod tests {
.await
.unwrap();

let (packet, id) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min);
socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
tokio::time::timeout(Duration::from_millis(10), socket.recv(&mut buf))
Expand All @@ -583,7 +583,7 @@ mod tests {

tokio::time::sleep(std::time::Duration::from_millis(120)).await;

let (packet, id) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min);
socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
tokio::time::timeout(Duration::from_millis(10), socket.recv(&mut buf))
Expand All @@ -594,7 +594,7 @@ mod tests {
assert_ne!(packet.stratum(), 0);
assert!(packet.valid_server_response(id));

let (packet, id) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min);
socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
tokio::time::timeout(Duration::from_millis(10), socket.recv(&mut buf))
Expand Down Expand Up @@ -632,7 +632,7 @@ mod tests {
.await
.unwrap();

let (packet, id) = NtpPacket::poll_message(PollInterval::MIN);
let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min);
socket.send(&packet.serialize()).await.unwrap();
let mut buf = [0; 48];
tokio::time::timeout(Duration::from_millis(10), socket.recv(&mut buf))
Expand Down
23 changes: 15 additions & 8 deletions ntp-daemon/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn spawn(
};

// Clock controller
let controller = ClockController::new(UnixNtpClock::new(), &system_snapshot);
let controller = ClockController::new(UnixNtpClock::new(), &system_snapshot, &config);

// Daemon channels
let system = Arc::new(tokio::sync::RwLock::new(system_snapshot));
Expand Down Expand Up @@ -227,7 +227,10 @@ fn requires_clock_recalculation(

#[cfg(test)]
mod tests {
use ntp_proto::{peer_snapshot, NtpDuration, NtpLeapIndicator, NtpTimestamp, PeerStatistics};
use ntp_proto::{
peer_snapshot, NtpDuration, NtpLeapIndicator, NtpTimestamp, PeerStatistics,
PollIntervalLimits,
};

use crate::{
config::{NormalizedAddress, StandardPeerConfig},
Expand Down Expand Up @@ -293,7 +296,7 @@ mod tests {
epoch,
base,
config,
PollInterval::MIN,
PollIntervalLimits::default().min,
));

assert!(!requires_clock_recalculation(
Expand All @@ -315,7 +318,7 @@ mod tests {
epoch,
base,
config,
PollInterval::MIN,
PollIntervalLimits::default().min,
));

assert!(requires_clock_recalculation(
Expand All @@ -337,7 +340,7 @@ mod tests {
epoch,
base,
config,
PollInterval::MIN,
PollIntervalLimits::default().min,
));

assert!(!requires_clock_recalculation(
Expand All @@ -359,15 +362,15 @@ mod tests {
epoch,
base,
config,
PollInterval::MIN,
PollIntervalLimits::default().min,
));

assert!(!requires_clock_recalculation(
MsgForSystem::MustDemobilize(PeerIndex::from_inner(1)),
epoch,
base,
config,
PollInterval::MIN,
PollIntervalLimits::default().min,
));
}

Expand Down Expand Up @@ -416,7 +419,11 @@ mod tests {
reset_tx,

reset_epoch,
controller: ClockController::new(TestClock {}, &SystemSnapshot::default()),
controller: ClockController::new(
TestClock {},
&SystemSnapshot::default(),
&SystemConfig::default(),
),
};

system.run().await
Expand Down

0 comments on commit af9c17f

Please sign in to comment.