Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash when Instant::now() returns the same value twice #830

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ bytes = "1.0.1"
futures = "0.3.6"
histogram = "0.6.9"
num_enum = "0.6"
tokio = { version = "1.27", features = ["net", "time", "io-util", "sync", "rt", "macros"] }
tokio = { version = "1.27", features = ["net", "time", "io-util", "sync", "rt", "macros", "test-util"] }
snap = "1.0"
uuid = { version = "1.0", features = ["v4"] }
rand = "0.8.3"
Expand Down
61 changes: 53 additions & 8 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2132,7 +2132,8 @@ mod latency_awareness {
use futures::{future::RemoteHandle, FutureExt};
use itertools::Either;
use scylla_cql::errors::{DbError, QueryError};
use tracing::{instrument::WithSubscriber, trace};
use tokio::time::{Duration, Instant};
use tracing::{instrument::WithSubscriber, trace, warn};
use uuid::Uuid;

use crate::{load_balancing::NodeRef, transport::node::Node};
Expand All @@ -2143,7 +2144,6 @@ mod latency_awareness {
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::{Duration, Instant},
};

#[derive(Debug)]
Expand All @@ -2168,7 +2168,7 @@ mod latency_awareness {
}
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct TimestampedAverage {
pub(super) timestamp: Instant,
pub(super) average: Duration,
Expand All @@ -2190,15 +2190,41 @@ mod latency_awareness {
timestamp: now,
}),
Some(prev_avg) => Some({
let delay = (now - prev_avg.timestamp).as_secs_f64();
let delay = now
.saturating_duration_since(prev_avg.timestamp)
.as_secs_f64();
let scaled_delay = delay / scale_secs;
let prev_weight = (scaled_delay + 1.).ln() / scaled_delay;
let prev_weight = if scaled_delay <= 0. {
1.
} else {
(scaled_delay + 1.).ln() / scaled_delay
};

let last_latency_secs = last_latency.as_secs_f64();
let prev_avg_secs = prev_avg.average.as_secs_f64();
let average = Duration::from_secs_f64(
let average = match Duration::try_from_secs_f64(
(1. - prev_weight) * last_latency_secs + prev_weight * prev_avg_secs,
);
) {
Ok(ts) => ts,
Err(e) => {
warn!(
"Error while calculating average: {e}. \
prev_avg_secs: {prev_avg_secs}, \
last_latency_secs: {last_latency_secs}, \
prev_weight: {prev_weight}, \
scaled_delay: {scaled_delay}, \
delay: {delay}, \
prev_avg.timestamp: {:?}, \
now: {now:?}",
prev_avg.timestamp
);

// Not sure when we could enter this branch,
// so I have no idea what would be a sensible value to return here,
// this does not seem like a very bad choice.
prev_avg.average
}
};
Self {
num_measures: prev_avg.num_measures + 1,
timestamp: now,
Expand Down Expand Up @@ -2733,7 +2759,7 @@ mod latency_awareness {
},
ExecutionProfile,
};
use std::time::Instant;
use tokio::time::Instant;

trait DefaultPolicyTestExt {
fn set_nodes_latency_stats(
Expand Down Expand Up @@ -3453,5 +3479,24 @@ mod latency_awareness {

session.query("whatever", ()).await.unwrap_err();
}

#[tokio::test]
async fn timestamped_average_works_when_clock_stops() {
tokio::time::pause();
let avg = Some(TimestampedAverage {
timestamp: Instant::now(),
average: Duration::from_secs(123),
num_measures: 1,
});
let new_avg = TimestampedAverage::compute_next(avg, Duration::from_secs(456), 10.0);
assert_eq!(
new_avg,
Some(TimestampedAverage {
timestamp: Instant::now(),
average: Duration::from_secs(123),
num_measures: 2,
}),
);
}
}
}