/
lib.rs
2479 lines (2200 loc) · 84.5 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//! A small and fast async runtime.
//!
//! # Executors
//!
//! There are three executors that poll futures:
//!
//! 1. Thread-local executor for tasks created by [`Task::local()`].
//! 2. Work-stealing executor for tasks created by [`Task::spawn()`].
//! 3. Blocking executor for tasks created by [`Task::blocking()`], [`blocking!`], [`iter()`],
//! [`reader()`] and [`writer()`].
//!
//! Blocking executor is the only one that spawns threads.
//!
//! # Reactor
//!
//! To wait for the next I/O event, the reactor calls [epoll] on Linux/Android, [kqueue] on
//! macOS/iOS/BSD, and [WSAPoll] on Windows.
//!
//! The [`Async`] type registers I/O handles in the reactor and is able to convert their blocking
//! operations into async operations.
//!
//! The [`Timer`] type registers timers in the reactor that will fire at the chosen points in
//! time.
//!
//! # Running
//!
//! Function [`run()`] simultaneously runs the thread-local executor, runs the work-stealing
//! executor, and polls the reactor for I/O events and timers. At least one thread has to be
//! calling [`run()`] in order for futures waiting on I/O and timers to get notified.
//!
//! If you want a multithreaded runtime, just call [`run()`] from multiple threads. See [here](TODO)
//! for an example.
//!
//! There is also [`block_on()`], which blocks the thread until a future completes, but it doesn't
//! do anything else besides that.
//!
//! Blocking tasks run in the background on a dedicated thread pool.
//!
//! # Examples
//!
//! Connect to a HTTP website, make a GET request, and pipe the response to the standard output:
//!
//! ```
//! use futures::prelude::*;
//! use smol::Async;
//! use std::net::TcpStream;
//!
//! fn main() -> std::io::Result<()> {
//! smol::run(async {
//! let mut stream = Async::<TcpStream>::connect("example.com:80").await?;
//! let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
//! stream.write_all(req).await?;
//!
//! let mut stdout = smol::writer(std::io::stdout());
//! futures::io::copy(&stream, &mut stdout).await?;
//! Ok(())
//! })
//! }
//! ```
//!
//! Look inside the [examples] directory for more.
//!
//! The examples show how to read a [file][read-file] or a [directory][read-directory],
//! [spawn][process-run] a process and read its [output][process-output], use timers to
//! [sleep][timer-sleep] or set a [timeout][timer-timeout], or catch the [Ctrl-C][ctrl-c] signal
//! for graceful shutdown.
//!
//! They also include a [web crawler][web-crawler], a simple TCP
//! [client][tcp-client]/[server][tcp-server], a TCP chat
//! [client][chat-client]/[server][chat-server], a simple
//! TLS [client][tls-client]/[server][tls-server], a simple
//! HTTP+TLS [client][simple-client]/[server][simple-server], a [hyper]
//! [client][hyper-client]/[server][hyper-server], an [async-h1]
//! [client][async-h1-client]/[server][async-h1-server], and a WebSocket+TLS
//! [client][websocket-client]/[server][websocket-server].
//!
//! Many non-async libraries can be plugged into the runtime: see how to use [inotify],
//! [timerfd], [signal-hook], and [uds_windows].
//!
//! Finally, you can mix this runtime with [async-std](TODO) and [tokio](TODO), or use runtime-specific
//! libraries like [reqwest](TODO).
//!
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
//! [WSAPoll]: https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsapoll
//!
//! [examples]: https://github.com/stjepang/smol/tree/master/examples
//! [async-h1]: https://docs.rs/async-h1
//! [hyper]: https://docs.rs/hyper
//! [www.rust-lang.org]: https://www.rust-lang.org/
//! [cat]: https://en.wikipedia.org/wiki/Cat_(Unix)
//!
//! [stdin-to-stdout]: https://github.com/stjepang/smol/blob/master/examples/stdin-to-stdout.rs
//! [ctrl-c]: https://github.com/stjepang/smol/blob/master/examples/ctrl-c.rs
//! [read-file]: https://github.com/stjepang/smol/blob/master/examples/read-file.rs
//! [read-directory]: https://github.com/stjepang/smol/blob/master/examples/read-directory.rs
//! [timer-sleep]: https://github.com/stjepang/smol/blob/master/examples/timer-sleep.rs
//! [timer-timeout]: https://github.com/stjepang/smol/blob/master/examples/timer-timeout.rs
//! [process-run]: https://github.com/stjepang/smol/blob/master/examples/process-run.rs
//! [process-output]: https://github.com/stjepang/smol/blob/master/examples/process-output.rs
//! [tcp-client]: https://github.com/stjepang/smol/blob/master/examples/tcp-client.rs
//! [tcp-server]: https://github.com/stjepang/smol/blob/master/examples/tcp-server.rs
//! [tls-client]: https://github.com/stjepang/smol/blob/master/examples/tls-client.rs
//! [tls-server]: https://github.com/stjepang/smol/blob/master/examples/tls-server.rs
//! [simple-client]: https://github.com/stjepang/smol/blob/master/examples/simple-client.rs
//! [simple-server]: https://github.com/stjepang/smol/blob/master/examples/simple-server.rs
//! [async-h1-client]: https://github.com/stjepang/smol/blob/master/examples/async-h1-client.rs
//! [async-h1-server]: https://github.com/stjepang/smol/blob/master/examples/async-h1-server.rs
//! [hyper-client]: https://github.com/stjepang/smol/blob/master/examples/hyper-client.rs
//! [hyper-server]: https://github.com/stjepang/smol/blob/master/examples/hyper-server.rs
//! [websocket-client]: https://github.com/stjepang/smol/blob/master/examples/websocket-client.rs
//! [websocket-server]: https://github.com/stjepang/smol/blob/master/examples/websocket-server.rs
//! [chat-client]: https://github.com/stjepang/smol/blob/master/examples/chat-client.rs
//! [chat-server]: https://github.com/stjepang/smol/blob/master/examples/chat-server.rs
//! [web-crawler]: https://github.com/stjepang/smol/blob/master/examples/web-crawler.rs
//! [inotify]: https://github.com/stjepang/smol/blob/master/examples/linux-inotify.rs
//! [timerfd]: https://github.com/stjepang/smol/blob/master/examples/linux-timerfd.rs
//! [signal-hook]: https://github.com/stjepang/smol/blob/master/examples/unix-signal.rs
//! [uds_windows]: https://github.com/stjepang/smol/blob/master/examples/windows-uds.rs
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#[cfg(not(any(
target_os = "linux", // epoll
target_os = "android", // epoll
target_os = "macos", // kqueue
target_os = "ios", // kqueue
target_os = "freebsd", // kqueue
target_os = "netbsd", // kqueue
target_os = "openbsd", // kqueue
target_os = "dragonfly", // kqueue
target_os = "windows", // WSAPoll
)))]
compile_error!("smol does not support this target OS");
use std::cell::{Cell, RefCell};
use std::collections::{BTreeMap, VecDeque};
use std::fmt::Debug;
use std::io::{self, Read, Write};
use std::mem;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
use std::num::Wrapping;
use std::panic;
use std::pin::Pin;
use std::sync::atomic::{self, AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::thread::{self, ThreadId};
use std::time::{Duration, Instant};
#[cfg(unix)]
use std::{
os::unix::io::{AsRawFd, RawFd},
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
path::Path,
};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, RawSocket};
use crossbeam::deque;
use crossbeam::queue::SegQueue;
use crossbeam::sync::{Parker, ShardedLock};
use futures::future::Either;
use futures::io::AllowStdIo;
use futures::prelude::*;
use once_cell::sync::Lazy;
use scoped_tls::scoped_thread_local;
use slab::Slab;
use socket2::{Domain, Protocol, Socket, Type};
use std::sync::{Condvar, Mutex, MutexGuard};
// TODO: explain the implementation and major components:
// - the Task struct
// - thread-local executor
// - work-stealing executor
// - reactor
// - Timer
// - Async
// - the IoEvent
// - blocking executor
// - sys module
// ---------- The task system ----------
/// A runnable future, ready for execution.
///
/// When a future is internally spawned using `async_task::spawn()` or `async_task::spawn_local()`,
/// we get back two values:
///
/// 1. an `async_task::Task<()>`, which we refer to as a `Runnable`
/// 2. an `async_task::JoinHandle<T, ()>`, which is wrapped inside a `Task<T>`
///
/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken.
type Runnable = async_task::Task<()>;
/// A spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
///
/// If the future panics, the panic will be unwound into the [`run()`] invocation that polled it.
/// However, this does not apply to the blocking executor - it will simply ignore panics and
/// continue running.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// // Spawn a task onto the work-stealing executor.
/// let task = Task::spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// });
///
/// // Wait for the task to complete.
/// assert_eq!(task.await, 3);
/// # });
/// ```
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(Option<async_task::JoinHandle<T, ()>>);
impl<T: 'static> Task<T> {
/// Spawns a future onto the thread-local executor.
///
/// Panics if the current thread is not inside an invocation of [`run()`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let task = Task::local(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # })
/// ```
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
THREAD_LOCAL_EXECUTOR.with(|ex| ex.spawn(future))
}
}
impl<T: Send + 'static> Task<T> {
/// Spawns a future onto the work-stealing executor.
///
/// This future may be stolen and polled by any thread calling [`run()`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let task = Task::spawn(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # });
/// ```
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
WorkStealingExecutor::get().spawn(future)
}
/// Spawns a future onto the blocking executor.
///
/// This future is allowed to block for an indefinite length of time.
///
/// For convenience, there is also the [`blocking!`] macro that spawns a blocking tasks and
/// immediately awaits it.
///
/// # Examples
///
/// ```no_run
/// use smol::Task;
/// use std::io::stdin;
///
/// # smol::block_on(async {
/// let line = Task::blocking(async {
/// let mut line = String::new();
/// std::io::stdin().read_line(&mut line).unwrap();
/// line
/// })
/// .await;
/// # });
/// ```
pub fn blocking(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
BlockingExecutor::get().spawn(future)
}
}
impl<T, E> Task<Result<T, E>>
where
T: Send + 'static,
E: Debug + Send + 'static,
{
/// Spawns a new task that awaits and unwraps the result.
///
/// The new task will panic if the original task results in an error.
///
/// # Examples
///
/// ```
/// use smol::{Async, Task};
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Task::spawn(async {
/// Async::<TcpStream>::connect("example.com:80").await
/// })
/// .unwrap()
/// .await;
/// # })
/// ```
pub fn unwrap(self) -> Task<T> {
Task::spawn(async { self.await.unwrap() })
}
/// Spawns a new task that awaits and unwraps the result.
///
/// The new task will panic with the provided message if the original task results in an error.
///
/// # Examples
///
/// ```
/// use smol::{Async, Task};
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Task::spawn(async {
/// Async::<TcpStream>::connect("example.com:80").await
/// })
/// .expect("cannot connect")
/// .await;
/// # })
/// ```
pub fn expect(self, msg: &str) -> Task<T> {
let msg = msg.to_owned();
Task::spawn(async move { self.await.expect(&msg) })
}
}
impl Task<()> {
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```no_run
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(mut self) {
self.0.take().unwrap();
}
}
impl<T> Task<T> {
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or `None` if it
/// didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// let task = Task::spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// });
///
/// Timer::after(Duration::from_secs(3)).await;
/// task.cancel().await;
/// # })
/// ```
pub async fn cancel(self) -> Option<T> {
// There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
// do `{ self }` here to avoid marking `self` as mutable.
let handle = { self }.0.take().unwrap();
handle.cancel();
handle.await
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = &self.0 {
handle.cancel();
}
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task has failed")),
}
}
}
/// Blocks on a single future.
///
/// This function polls the future in a loop, parking the current thread after each step to wait
/// until its waker is woken.
///
/// Unlike [`run`], it does not run executors or poll the reactor!
///
/// You can think of it as the easiest and most efficient way of turning an async operation into a
/// blocking operation.
///
/// # Examples
///
/// ```
/// use futures::future;
/// use smol::{Async, Timer};
/// use std::thread;
/// use std::time::Duration;
///
/// // Run executors and the reactor on a separeate thread, forever.
/// thread::spawn(|| smol::run(future::pending::<()>()));
///
/// smol::block_on(async {
/// // Sleep for a second.
/// // This timer only works because there's a thread calling `run()`.
/// Timer::after(Duration::from_secs(1)).await;
/// })
/// ```
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
// The implementation of this function is explained in the following blog post:
// https://stjepang.github.io/2020/01/25/build-your-own-block-on.html
thread_local! {
// Parker and waker associated with the current thread.
static CACHE: RefCell<(Parker, Waker)> = {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
RefCell::new((parker, waker))
};
}
CACHE.with(|cache| {
// Panic if `block_on()` is called recursively.
let (parker, waker) = &mut *cache.try_borrow_mut().ok().expect("recursive `block_on()`");
// If enabled, set up tokio before execution begins.
enter(|| {
futures::pin_mut!(future);
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
})
})
}
/// Runs executors and polls the reactor.
///
/// This function simultaneously runs the thread-local executor, runs the work-stealing
/// executor, and polls the reactor for I/O events and timers. At least one thread has to be
/// calling [`run()`] in order for futures waiting on I/O and timers to get notified.
///
/// # Examples
/// TODO a thread-pool example with num_cpus::get().max(1)
/// TODO a stoppable thread-pool with channels
pub fn run<T>(future: impl Future<Output = T>) -> T {
// If this thread is already inside an executor, panic.
if WORKER.is_set() {
panic!("recursive `run()`");
}
// Create a thread-local executor and a worker in the work-stealing executor.
let local = ThreadLocalExecutor::new();
let worker = WorkStealingExecutor::get().worker();
// Create a waker that triggers an I/O event in the thread-local scheduler.
let ev = local.event.clone();
let waker = async_task::waker_fn(move || ev.set());
let cx = &mut Context::from_waker(&waker);
futures::pin_mut!(future);
// Set up the thread-locals (and tokio, if enabled) before execution begins.
let enter = |f| THREAD_LOCAL_EXECUTOR.set(&local, || WORKER.set(&worker, || enter(f)));
enter(|| {
// We run four components at the same time, treating them all fairly and making sure none
// of them get starved:
//
// 1. `future` - the main future.
// 2. `local - the thread-local executor.
// 3. `worker` - the work-stealing executor.
// 4. `Reactor::get()` - the reactor.
//
// When all four components are out of work, we block the current thread on
// epoll/kevent/WSAPoll. If new work comes in that isn't naturally triggered by an I/O
// event registered with `Async` handles, we use `IoEvent`s to simulate an I/O event that
// will unblock the thread:
//
// - When the main future is woken, `local.event` is triggered.
// - When thread-local executor gets new work, `local.event` is triggered.
// - When work-stealing executor gets new work, `worker.executor.event` is triggered.
// - When a new earliest timer is registered, `Reactor::get().event` is triggered.
//
// This way we make sure that if any changes happen that might give us new work will
// unblock epoll/kevent/WSAPoll and let us continue the loop.
loop {
// 1. Poll the main future.
if let Poll::Ready(val) = use_throttle(|| future.as_mut().poll(cx)) {
return val;
}
// 2. Run a batch of tasks in the thread-local executor.
let more_local = local.execute();
// 3. Run a batch of tasks in the work-stealing executor.
let more_worker = worker.execute();
// 4. Poll the reactor.
Reactor::get().poll().expect("failure while polling I/O");
// If there is more work in the thread-local or the work-stealing executor, continue
// the loop.
if more_local || more_worker {
continue;
}
// Prepare for blocking until the reactor is locked or `local.event` is triggered.
//
// Note that there is no need to wait for `worker.executor.event`. If the reactor is
// locked immediately, we'll check for the I/O event right after that anyway.
//
// If some other worker is holding the reactor locked, it will be unblocked as soon as
// the I/O event is triggered. Then, another worker will be allowed to lock the
// reactor, and will be unblocked if there is more work to do. Every worker triggers
// `worker.executor.event` each time it finds a runnable task.
let lock = Reactor::get().lock();
let ready = local.event.ready();
futures::pin_mut!(lock);
futures::pin_mut!(ready);
// Block until either the reactor is locked or `local.event` is triggered.
if let Either::Left((mut reactor, _)) = block_on(future::select(lock, ready)) {
// Clear the two I/O events.
let local_ev = local.event.clear();
let worker_ev = worker.executor.event.clear();
// If any of the two I/O events has been triggered, continue the loop.
if local_ev || worker_ev {
continue;
}
// Block until an I/O event occurs.
reactor.wait().expect("failure while waiting on I/O");
}
}
})
}
/// TODO
#[cfg(not(feature = "tokio"))]
fn enter<T>(f: impl FnOnce() -> T) -> T {
f()
}
/// TODO
#[cfg(feature = "tokio")]
fn enter<T>(f: impl FnOnce() -> T) -> T {
use tokio::runtime::Runtime;
static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));
RT.enter(f)
}
// Number of times the current task is allowed to poll I/O operations.
//
// When this budget is used up, I/O operations will wake the current task and return
// `Poll::Pending`.
//
// This thread-local is set before running any task.
scoped_thread_local!(static BUDGET: Cell<u32>);
/// Sets an I/O budget for polling a future.
///
/// Once this budget is exceeded, polled I/O operations will always wake the current task and
/// return `Poll::Pending`.
///
/// We throttle I/O this way in order to prevent futures from running for
/// too long and thus starving other futures.
fn use_throttle<T>(poll: impl FnOnce() -> T) -> T {
// This is a fairly arbitrary number that seems to work well in practice.
BUDGET.set(&Cell::new(200), poll)
}
/// Returns `Poll::Pending` if the I/O budget has been used up.
fn poll_throttle(cx: &mut Context<'_>) -> Poll<()> {
// Decrement the budget and check if it was zero.
if BUDGET.is_set() && BUDGET.with(|b| b.replace(b.get().saturating_sub(1))) == 0 {
// Make sure to wake the current task. The task is not *really* pending, we're just
// artificially throttling it to let other tasks be run.
cx.waker().wake_by_ref();
return Poll::Pending;
}
Poll::Ready(())
}
// ---------- Thread-local executor ----------
// The thread-local executor.
//
// This thread-local is only set while inside `run()`.
scoped_thread_local!(static THREAD_LOCAL_EXECUTOR: ThreadLocalExecutor);
/// An executor for thread-local tasks.
///
/// Thread-local tasks are spawned by calling `Task::local()` and their futures do not have to
/// implement `Send`. They can only be run by the same thread that created them.
struct ThreadLocalExecutor {
/// The main task queue.
queue: RefCell<VecDeque<Runnable>>,
/// When another thread wakes a task belonging to this executor, it goes into this queue.
injector: Arc<SegQueue<Runnable>>,
/// An I/O event that is triggered when another thread wakes a task belonging to this executor.
event: IoEvent,
}
impl ThreadLocalExecutor {
/// Creates a new thread-local executor.
fn new() -> ThreadLocalExecutor {
ThreadLocalExecutor {
queue: RefCell::new(VecDeque::new()),
injector: Arc::new(SegQueue::new()),
event: IoEvent::new().expect("cannot create an `IoEvent`"),
}
}
/// Spawns a future onto this executor.
///
/// Returns a `Task` handle for the spawned task.
fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let id = thread_id();
let injector = self.injector.clone();
let event = self.event.clone();
// The function that schedules a runnable task.
let schedule = move |runnable| {
if thread_id() == id {
// If scheduling from the original thread, push into the main queue.
THREAD_LOCAL_EXECUTOR.with(|ex| ex.queue.borrow_mut().push_back(runnable));
} else {
// If scheduling from a different thread, push into the injector queue.
injector.push(runnable);
// Trigger an I/O event to let the original thread know that a task has been
// scheduled. If that thread is inside epoll/kqueue/WSAPoll, an I/O event will wake
// it up.
event.set();
}
};
// Create a task, schedule it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Executes a batch of tasks and returns `true` if there are more tasks to run.
fn execute(&self) -> bool {
// Execute 4 series of 50 tasks.
for _ in 0..4 {
for _ in 0..50 {
// Find the next task to run.
match self.pop() {
None => {
// There are no more tasks to run.
return false;
}
Some(r) => {
// Run the task.
use_throttle(|| r.run());
}
}
}
// Poll the reactor and drain the injector queue. We do this occasionally to make
// execution more fair to all tasks involved.
self.fetch();
}
// There are likely more tasks to run.
true
}
/// Finds the next task to run.
fn pop(&self) -> Option<Runnable> {
// Check if there is a task in the main queue.
if let Some(r) = self.queue.borrow_mut().pop_front() {
return Some(r);
}
// If not, fetch more tasks from the reactor or the injector queue.
self.fetch();
// Check the main queue again.
self.queue.borrow_mut().pop_front()
}
/// Polls the reactor and moves all tasks from the injector queue into the main queue.
fn fetch(&self) {
// The reactor might wake tasks belonging to this executor.
Reactor::get().poll().expect("failure while polling I/O");
// Move tasks from the injector queue into the main queue.
let mut queue = self.queue.borrow_mut();
while let Ok(r) = self.injector.pop() {
queue.push_back(r);
}
}
}
/// Same as `std::thread::current().id()`, but more efficient.
fn thread_id() -> ThreadId {
thread_local! {
static ID: ThreadId = thread::current().id();
}
ID.try_with(|id| *id)
.unwrap_or_else(|_| thread::current().id())
}
// ---------- Work-stealing executor ----------
// The current thread's worker.
//
// Other threads may steal tasks from this worker through its associated stealer that was
// registered in the work-stealing executor.
//
// This thread-local is only set while inside `run()`.
scoped_thread_local!(static WORKER: Worker);
/// The global work-stealing executor.
///
/// Tasks created by `Task::spawn()` go into this executor. Any calling `run()` initializes a
/// `Worker` that participates in work stealing, which is allowed to run any task in this executor
/// or in other workers.
///
/// Since tasks can be stolen by any worker and thus move from thread to thread, their futures must
/// implement `Send`.
///
/// There is only one global instance of this type, accessible by `Reactor::get()`.
///
/// Work stealing is a strategy that reduces contention in a multi-threaded environment. If all
/// invocations of `run()` used the same global task queue all the time, they would constantly
/// "step on each other's toes", causing a lot of CPU cache traffic and too often waste time
/// retrying queue operations in compare-and-swap loops.
///
/// The solution is to have a separate queue in each invocation of `run()`, called a "worker".
/// Each thread is primarily using its own worker. Once there are no more tasks in the worker, we
/// either grab a batch of tasks from the main global queue, or steal tasks from other workers.
/// Of course, work-stealing still causes contention in some cases, but much less often.
///
/// More about work stealing: https://en.wikipedia.org/wiki/Work_stealing
struct WorkStealingExecutor {
/// When a thread that is not inside `run()` spawns or wakes a task, it goes into this queue.
injector: deque::Injector<Runnable>,
/// Registered handles for stealing tasks from workers.
stealers: ShardedLock<Slab<deque::Stealer<Runnable>>>,
/// An I/O event that is triggered whenever there might be available tasks to run.
event: IoEvent,
}
impl WorkStealingExecutor {
/// Returns a reference to the global work-stealing executor.
fn get() -> &'static WorkStealingExecutor {
static EXECUTOR: Lazy<WorkStealingExecutor> = Lazy::new(|| WorkStealingExecutor {
injector: deque::Injector::new(),
stealers: ShardedLock::new(Slab::new()),
event: IoEvent::new().expect("cannot create an `IoEvent`"),
});
&EXECUTOR
}
/// Spawns a future onto this executor.
///
/// Returns a `Task` handle for the spawned task.
fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
// The function that schedules a runnable task.
let schedule = move |runnable| {
if WORKER.is_set() {
// If scheduling from a worker thread, push into the worker's queue.
WORKER.with(|w| w.push(runnable));
} else {
// If scheduling from a non-worker thread, push into the injector queue.
self.injector.push(runnable);
// Trigger an I/O event to let workers know that a task has been scheduled.
self.event.set();
}
};
// Create a task, schedule it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Registers a new worker.
///
/// The worker will automatically deregister itself when dropped.
fn worker(&'static self) -> Worker {
let mut stealers = self.stealers.write().unwrap();
let vacant = stealers.vacant_entry();
// Create a worker and put its stealer handle into the executor.
let worker = Worker {
key: vacant.key(),
slot: Cell::new(None),
queue: deque::Worker::new_fifo(),
executor: self,
};
vacant.insert(worker.queue.stealer());
worker
}
}
/// A worker that participates in the work-stealing executor.
///
/// Each invocation of `run()` creates its own worker.
struct Worker {
/// The ID of this worker obtained during registration.
key: usize,
/// A slot into which tasks go before entering the actual queue.
slot: Cell<Option<Runnable>>,
/// A queue of tasks.
///
/// Other workers are able to steal tasks from this queue.
queue: deque::Worker<Runnable>,
/// The parent work-stealing executor.
///
/// This is the same thing as `WorkStealingExecutor::get()`, but we keep a reference here for
/// convenience.
executor: &'static WorkStealingExecutor,
}
impl Worker {
/// Executes a batch of tasks and returns `true` if there are more tasks to run.
fn execute(&self) -> bool {
// Execute 4 series of 50 tasks.
for _ in 0..4 {
for _ in 0..50 {
// Find the next task to run.
match self.pop() {
None => {
// There are no more tasks to run.
return false;
}
Some(r) => {
// Notify other workers that there may be more tasks.
self.executor.event.set();
// Run the task.
if use_throttle(|| r.run()) {
// The task was scheduled while it was running, which means it got
// pushed into this worker. It is now inside the slot and would be the
// next task to run.
//
// Let's flush the slot in order to give other tasks a chance to run.
self.push(None);
}
}
}
}
// Flush the slot, grab some tasks from the global queue, and poll the reactor. We do
// this occasionally to make execution more fair to all tasks involved.
self.push(None);
self.fetch();
}
// There are likely more tasks to run.
true
}
/// Pushes a task into this worker.
///
/// If the given task is `None`, the slot's contents gets replaced with `None`, moving its
/// previously held task into the queue (if there was one).
fn push(&self, runnable: impl Into<Option<Runnable>>) {
// Put the task into the slot.
if let Some(r) = self.slot.replace(runnable.into()) {
// If the slot had a task, push it into the queue.
self.queue.push(r);
}
}
/// Finds the next task to run.
fn pop(&self) -> Option<Runnable> {
// Check if there is a task in the slot or in the queue.
if let Some(r) = self.slot.take().or_else(|| self.queue.pop()) {
return Some(r);
}
// If not, fetch more tasks from the injector queue, the reactor, or other workers.
self.fetch();
// Check the slot and the queue again.
self.slot.take().or_else(|| self.queue.pop())
}
/// Steals from the injector and polls the reactor, or steals from other workers if that fails.
fn fetch(&self) {
// Try stealing from the global queue.
if let Some(r) = retry_steal(|| self.executor.injector.steal_batch_and_pop(&self.queue)) {
// Push the task, but don't return -- let's not forget to poll the reactor.
self.push(r);
}
// Poll the reactor.
Reactor::get().poll().expect("failure while polling I/O");
// If there is at least one task in the slot, return.
if let Some(r) = self.slot.take() {
self.slot.set(Some(r));
return;
}
// If there is at least one task in the queue, return.
if !self.queue.is_empty() {
return;
}
// Still no tasks found - our last hope is to steal from other workers.
let stealers = self.executor.stealers.read().unwrap();
if let Some(r) = retry_steal(|| {
// Pick a random starting point in the iterator list and rotate the list.
let n = stealers.len();
let start = fast_random(n);
let iter = stealers.iter().chain(stealers.iter()).skip(start).take(n);
// Remove this worker's stealer handle.
let iter = iter.filter(|(k, _)| *k != self.key);
// Try stealing from each worker in the list. Collecting stops as soon as we get a
// `Steal::Success`. Otherwise, if any steal attempt resulted in a `Steal::Retry`,
// that's the collected result and we'll retry from the beginning.
iter.map(|(_, s)| s.steal_batch_and_pop(&self.queue))
.collect()
}) {
// Push the stolen task.
self.push(r);
}
}
}
/// Returns a random number in the interval `0..n`.
fn fast_random(n: usize) -> usize {
thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1));
}
RNG.with(|rng| {
// This is the 32-bit variant of Xorshift: https://en.wikipedia.org/wiki/Xorshift
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);
// This is a fast alternative to `x % n`:
// https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((x.0 as u64).wrapping_mul(n as u64) >> 32) as usize
})
}