Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specialize sleep_until implementation #118480

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions library/std/src/sys/pal/hermit/thread.rs
Expand Up @@ -78,6 +78,14 @@ impl Thread {
}
}

pub fn sleep_until(deadline: Instant) {
let now = Instant::now();

if let Some(delay) = deadline.checked_duration_since(now) {
sleep(delay);
}
}

pub fn join(self) {
unsafe {
let _ = abi::join(self.tid);
Expand Down
8 changes: 8 additions & 0 deletions library/std/src/sys/pal/itron/thread.rs
Expand Up @@ -210,6 +210,14 @@ impl Thread {
}
}

pub fn sleep_until(deadline: Instant) {
let now = Instant::now();

if let Some(delay) = deadline.checked_duration_since(now) {
sleep(delay);
}
}

pub fn join(self) {
// Safety: `ThreadInner` is alive at this point
let inner = unsafe { self.p_inner.as_ref() };
Expand Down
8 changes: 8 additions & 0 deletions library/std/src/sys/pal/sgx/thread.rs
Expand Up @@ -137,6 +137,14 @@ impl Thread {
usercalls::wait_timeout(0, dur, || true);
}

pub fn sleep_until(deadline: Instant) {
let now = Instant::now();

if let Some(delay) = deadline.checked_duration_since(now) {
sleep(delay);
}
}

pub fn join(self) {
self.0.wait();
}
Expand Down
90 changes: 90 additions & 0 deletions library/std/src/sys/pal/unix/thread.rs
Expand Up @@ -264,6 +264,79 @@ impl Thread {
}
}

#[cfg(not(any(
target_os = "freebsd",
target_os = "netbsd",
target_os = "linux",
target_os = "android",
target_os = "solaris",
target_os = "illumos",
target_os = "macos",
target_os = "ios",
target_os = "tvos",
target_os = "watchos"
)))]
pub fn sleep_until(deadline: Instant) {
let now = Instant::now();

if let Some(delay) = deadline.checked_duration_since(now) {
sleep(delay);
}
}

// Note depends on clock_nanosleep (not supported on macos/ios/watchos/tvos)
#[cfg(any(
target_os = "freebsd",
target_os = "netbsd",
target_os = "linux",
target_os = "android",
target_os = "solaris",
target_os = "illumos",
))]
Comment on lines +288 to +295
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add target_os = "dragonfly", target_os = "hurd" and target_os = "fuchsia"

pub fn sleep_until(deadline: crate::time::Instant) {
let mut ts = deadline
.into_inner()
.into_timespec()
.to_timespec()
.expect("Timespec is narrower then libc::timespec thus conversion can't fail");
let ts_ptr = &mut ts as *mut _;

// If we're awoken with a signal and the return value is -1
// clock_nanosleep needs to be called again.
unsafe {
while libc::clock_nanosleep(libc::CLOCK_MONOTONIC, libc::TIMER_ABSTIME, ts_ptr, ts_ptr)
== -1
{
assert_eq!(
os::errno(),
libc::EINTR,
"clock nanosleep should only return an error if interrupted"
);
}
}
}

#[cfg(any(target_os = "macos", target_os = "ios", target_os = "tvos", target_os = "watchos"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

visionOS was added in #121419.

Suggested change
#[cfg(any(target_os = "macos", target_os = "ios", target_os = "tvos", target_os = "watchos"))]
#[cfg(any(target_os = "macos", target_os = "ios", target_os = "tvos", target_os = "watchos", target_os = "visionos"))]

Same goes for the other cfgs.

pub fn sleep_until(deadline: crate::time::Instant) {
use super::time::Timespec;
use core::mem::MaybeUninit;

let Timespec { tv_sec, tv_nsec } = deadline.into_inner().into_timespec();
let nanos = (tv_sec as u64).saturating_mul(1_000_000_000).saturating_add(tv_nsec.0 as u64);

let mut info = MaybeUninit::uninit();
unsafe {
let ret = mach_timebase_info(info.as_mut_ptr());
assert_eq!(ret, KERN_SUCCESS);

let info = info.assume_init();
let ticks = nanos * (info.denom as u64) / (info.numer as u64);

mach_wait_until(ticks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually check the return type of mach_wait_until.

Suggested change
mach_wait_until(ticks);
let ret = mach_wait_until(ticks);

assert_eq!(ret, KERN_SUCCESS);
}
}

pub fn join(self) {
unsafe {
let ret = libc::pthread_join(self.id, ptr::null_mut());
Expand All @@ -283,6 +356,23 @@ impl Thread {
}
}

#[cfg(any(target_os = "macos", target_os = "ios", target_os = "tvos", target_os = "watchos"))]
const KERN_SUCCESS: libc::c_int = 0;

#[cfg(any(target_os = "macos", target_os = "ios", target_os = "tvos", target_os = "watchos"))]
#[repr(C)]
struct mach_timebase_info_type {
numer: u32,
denom: u32,
}

#[cfg(any(target_os = "macos", target_os = "ios", target_os = "tvos", target_os = "watchos"))]
extern "C" {
fn mach_wait_until(deadline: u64) -> libc::c_int;
fn mach_timebase_info(info: *mut mach_timebase_info_type) -> libc::c_int;

}

impl Drop for Thread {
fn drop(&mut self) {
let ret = unsafe { libc::pthread_detach(self.id) };
Expand Down
10 changes: 7 additions & 3 deletions library/std/src/sys/pal/unix/time.rs
Expand Up @@ -19,7 +19,7 @@ pub(in crate::sys) const TIMESPEC_MAX_CAPPED: libc::timespec = libc::timespec {
#[repr(transparent)]
#[rustc_layout_scalar_valid_range_start(0)]
#[rustc_layout_scalar_valid_range_end(999_999_999)]
struct Nanoseconds(u32);
pub(in crate::sys::unix) struct Nanoseconds(pub(in crate::sys::unix) u32);

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SystemTime {
Expand All @@ -28,8 +28,8 @@ pub struct SystemTime {

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct Timespec {
tv_sec: i64,
tv_nsec: Nanoseconds,
pub(in crate::sys::unix) tv_sec: i64,
pub(in crate::sys::unix) tv_nsec: Nanoseconds,
}

impl SystemTime {
Expand Down Expand Up @@ -318,6 +318,10 @@ impl Instant {
pub fn checked_sub_duration(&self, other: &Duration) -> Option<Instant> {
Some(Instant { t: self.t.checked_sub_duration(other)? })
}

pub(in crate::sys::unix) fn into_timespec(self) -> Timespec {
self.t
}
}

impl fmt::Debug for Instant {
Expand Down
71 changes: 40 additions & 31 deletions library/std/src/sys/pal/wasi/thread.rs
Expand Up @@ -3,7 +3,7 @@ use crate::io;
use crate::mem;
use crate::num::NonZero;
use crate::sys::unsupported;
use crate::time::Duration;
use crate::time::{Duration, Instant};

cfg_if::cfg_if! {
if #[cfg(target_feature = "atomics")] {
Expand Down Expand Up @@ -138,35 +138,18 @@ impl Thread {
let nanos = dur.as_nanos();
assert!(nanos <= u64::MAX as u128);

const USERDATA: wasi::Userdata = 0x0123_45678;

let clock = wasi::SubscriptionClock {
id: wasi::CLOCKID_MONOTONIC,
timeout: nanos as u64,
precision: 0,
flags: 0,
};

let in_ = wasi::Subscription {
userdata: USERDATA,
u: wasi::SubscriptionU { tag: 0, u: wasi::SubscriptionUU { clock } },
};
unsafe {
let mut event: wasi::Event = mem::zeroed();
let res = wasi::poll_oneoff(&in_, &mut event, 1);
match (res, event) {
(
Ok(1),
wasi::Event {
userdata: USERDATA,
error: wasi::ERRNO_SUCCESS,
type_: wasi::EVENTTYPE_CLOCK,
..
},
) => {}
_ => panic!("thread::sleep(): unexpected result of poll_oneoff"),
}
}
sleep_with(nanos as u64, wasi::CLOCKID_MONOTONIC, 0);
}

pub fn sleep_until(deadline: Instant) {
let nanos = deadline.into_inner().into_inner().as_nanos();
assert!(nanos <= u64::MAX as u128);

sleep_with(
nanos as u64,
wasi::CLOCKID_MONOTONIC,
wasi::SUBCLOCKFLAGS_SUBSCRIPTION_CLOCK_ABSTIME,
);
}

pub fn join(self) {
Expand All @@ -186,7 +169,33 @@ impl Thread {
}
}

pub fn available_parallelism() -> io::Result<NonZero<usize>> {
fn sleep_with(nanos: u64, clock_id: wasi::Clockid, flags: u16) {
let clock = wasi::SubscriptionClock { id: clock_id, timeout: nanos, precision: 0, flags };

const USERDATA: wasi::Userdata = 0x0123_45678;
let in_ = wasi::Subscription {
userdata: USERDATA,
u: wasi::SubscriptionU { tag: 0, u: wasi::SubscriptionUU { clock } },
};
unsafe {
let mut event: wasi::Event = mem::zeroed();
let res = wasi::poll_oneoff(&in_, &mut event, 1);
match (res, event) {
(
Ok(1),
wasi::Event {
userdata: USERDATA,
error: wasi::ERRNO_SUCCESS,
type_: wasi::EVENTTYPE_CLOCK,
..
},
) => {}
_ => panic!("thread::sleep(): unexpected result of poll_oneoff"),
}
}
}

pub fn available_parallelism() -> io::Result<NonZeroUsize> {
unsupported()
}

Expand Down
4 changes: 4 additions & 0 deletions library/std/src/sys/pal/wasi/time.rs
Expand Up @@ -36,6 +36,10 @@ impl Instant {
pub fn checked_sub_duration(&self, other: &Duration) -> Option<Instant> {
Some(Instant(self.0.checked_sub(*other)?))
}

pub(crate) fn into_inner(self) -> Duration {
self.0
}
}

impl SystemTime {
Expand Down
21 changes: 20 additions & 1 deletion library/std/src/sys/pal/windows/thread.rs
Expand Up @@ -8,7 +8,7 @@ use crate::sys::c;
use crate::sys::handle::Handle;
use crate::sys::stack_overflow;
use crate::sys_common::FromInner;
use crate::time::Duration;
use crate::time::{Duration, Instant};

use core::ffi::c_void;

Expand Down Expand Up @@ -101,6 +101,25 @@ impl Thread {
}
}

pub fn sleep_until(deadline: Instant) {
fn high_precision_sleep(deadline: Instant) -> Result<(), ()> {
let timer = WaitableTimer::high_resolution()?;
timer.set_deadline(deadline.into_inner())?;
timer.wait()
}
// Attempt to use high-precision sleep (Windows 10, version 1803+).
// On error fallback to the standard `Sleep` function.
// Also preserves the zero duration behaviour of `Sleep`.
if high_precision_sleep(deadline).is_ok() {
return;
}

let now = Instant::now();
if let Some(dur) = deadline.checked_duration_since(now) {
unsafe { c::Sleep(super::dur2timeout(dur)) }
};
}

pub fn handle(&self) -> &Handle {
&self.handle
}
Expand Down
6 changes: 6 additions & 0 deletions library/std/src/sys/pal/windows/time.rs
Expand Up @@ -250,6 +250,12 @@ impl WaitableTimer {
let result = unsafe { c::SetWaitableTimer(self.handle, &time, 0, None, null(), c::FALSE) };
if result != 0 { Ok(()) } else { Err(()) }
}
pub fn set_deadline(&self, deadline: Instant) -> Result<(), ()> {
// Convert the Instant to a format similar to FILETIME.
let time = checked_dur2intervals(&deadline.t).ok_or(())?;
let result = unsafe { c::SetWaitableTimer(self.handle, &time, 0, None, null(), c::FALSE) };
if result != 0 { Ok(()) } else { Err(()) }
}
pub fn wait(&self) -> Result<(), ()> {
let result = unsafe { c::WaitForSingleObject(self.handle, c::INFINITE) };
if result != c::WAIT_FAILED { Ok(()) } else { Err(()) }
Expand Down
8 changes: 8 additions & 0 deletions library/std/src/sys/pal/xous/thread.rs
Expand Up @@ -127,6 +127,14 @@ impl Thread {
}
}

pub fn sleep_until(deadline: Instant) {
let now = Instant::now();

if let Some(delay) = deadline.checked_duration_since(now) {
sleep(delay);
}
}

pub fn join(self) {
join_thread(self.tid).unwrap();
}
Expand Down
34 changes: 28 additions & 6 deletions library/std/src/thread/mod.rs
Expand Up @@ -879,8 +879,34 @@ pub fn sleep(dur: Duration) {
///
/// # Platform-specific behavior
///
/// This function uses [`sleep`] internally, see its platform-specific behaviour.
/// In most cases this function will an call OS specific function. Where that
/// is not supported [`sleep`] is used. Those platforms are referred to as other
/// in the table below.
///
/// # Underlying System calls
///
/// The following system calls are [currently] being used:
///
///
/// | Platform | System call |
/// |-----------|----------------------------------------------------------------------|
/// | Linux | [clock_nanosleep] (Monotonic clock) |
/// | BSD except OpenBSD | [clock_nanosleep] (Monotonic Clock)] |
/// | Android | [clock_nanosleep] (Monotonic Clock)] |
/// | Solaris | [clock_nanosleep] (Monotonic Clock)] |
/// | Illumos | [clock_nanosleep] (Monotonic Clock)] |
/// | Darwin | [mach_wait_until] |
/// | WASI | [subscription_clock] |
/// | Windows | [SetWaitableTimer] |
/// | Other | `sleep_until` uses [`sleep`] and does not issue a syscall itself |
///
/// [currently]: crate::io#platform-specific-behavior
/// [clock_nanosleep]: https://linux.die.net/man/3/clock_nanosleep
/// [subscription_clock]: https://github.com/WebAssembly/WASI/blob/main/legacy/preview1/docs.md#-subscription_clock-record
/// [mach_wait_until]: https://developer.apple.com/library/archive/technotes/tn2169/_index.html
/// [SetWaitableTimer]: https://learn.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-setwaitabletimer
///
/// **Disclaimer:** These system calls might change over time.
///
/// # Examples
///
Expand Down Expand Up @@ -942,11 +968,7 @@ pub fn sleep(dur: Duration) {
/// ```
#[unstable(feature = "thread_sleep_until", issue = "113752")]
pub fn sleep_until(deadline: Instant) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The section on platform-specific behaviour still needs updating.

let now = Instant::now();

if let Some(delay) = deadline.checked_duration_since(now) {
sleep(delay);
}
imp::Thread::sleep_until(deadline)
}

/// Used to ensure that `park` and `park_timeout` do not unwind, as that can
Expand Down