Skip to content

Commit

Permalink
refac(subscriber): remove futures dependency #460)
Browse files Browse the repository at this point in the history
I noticed that `console-subscriber` was my only dependency pulling in
the `futures` top-level library so I decided to try my hand at removing
that dependency. This still relies on `futures-task` for an optimized
`NoopWaker`
  • Loading branch information
asonix committed Aug 24, 2023
1 parent 9c18c4f commit d176404
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tonic = { version = "0.9", features = ["transport"] }
tracing-core = "0.1.24"
tracing = "0.1.26"
tracing-subscriber = { version = "0.3.11", default-features = false, features = ["fmt", "registry"] }
futures = { version = "0.3", default-features = false }
futures-task = { version = "0.3", default-features = false }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
# The parking_lot dependency is renamed, because we want our `parking_lot`
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
Expand Down
13 changes: 11 additions & 2 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use console_api as proto;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};

use futures::FutureExt;
use std::{
sync::{
atomic::{AtomicBool, Ordering::*},
Expand Down Expand Up @@ -221,7 +220,7 @@ impl Aggregator {
// to be woken when the flush interval has elapsed, or when the
// channel is almost full.
let mut drained = false;
while let Some(event) = self.events.recv().now_or_never() {
while let Some(event) = recv_now_or_never(&mut self.events) {
match event {
Some(event) => {
self.update_state(event);
Expand Down Expand Up @@ -500,6 +499,16 @@ impl Aggregator {
}
}

fn recv_now_or_never<T>(receiver: &mut mpsc::Receiver<T>) -> Option<Option<T>> {
let waker = futures_task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);

match receiver.poll_recv(&mut cx) {
std::task::Poll::Ready(opt) => Some(opt),
std::task::Poll::Pending => None,
}
}

// ==== impl Flush ===

impl Flush {
Expand Down

0 comments on commit d176404

Please sign in to comment.