From c4137783d02971addc02726bdfa9e19293591a14 Mon Sep 17 00:00:00 2001 From: Basti Ortiz <39114273+Some-Dood@users.noreply.github.com> Date: Wed, 25 Aug 2021 07:42:54 +0800 Subject: [PATCH] Chore: remove excess dependencies (#4) * Deps: use `futures_core` over `futures` to minimize dependencies This practically cuts the number of dependencies by half. * Deps: use `futures-util` in test dependencies * Chore: remove redundant use of `^` semver range Cargo uses the `^` semver range by default. * Chore: remove dependency on `tokio-test` macros --- Cargo.toml | 12 ++--- src/bounded.rs | 2 +- src/error.rs | 21 ++++---- src/unbounded.rs | 2 +- tests/loom_bounded.rs | 15 +++--- tests/tests.rs | 113 ++++++++++++++++++++---------------------- 6 files changed, 75 insertions(+), 90 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 229dbc5..6b8e490 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,16 +27,13 @@ maintenance = { status = "actively-developed" } [dependencies] tokio = { version = "1", features = ["sync", "time"] } -futures = "0.3" +futures-core = { version = "0.3", default-features = false } [dev-dependencies] +futures-util = { version = "0.3", default-features = false } tokio = { version = "1", features = ["test-util", "rt", "rt-multi-thread", "macros"] } -tokio-test = "^0.4" -loom = { version = "^0.4.0" } -criterion = { version="^0.3.4", features = ["async_tokio", "html_reports"] } - -[target.'cfg(loom)'.dependencies] -loom = { version = "^0.4.0", features = ["futures", "checkpoint"] } +loom = { version = "0.5", features = ["futures", "checkpoint"] } +criterion = { version = "0.3", features = ["async_tokio", "html_reports"] } [[test]] name = "tests" @@ -45,7 +42,6 @@ name = "tests" name = "bench_channel_async" harness = false - [[bench]] name = "bench_channel_sync" harness = false diff --git a/src/bounded.rs b/src/bounded.rs index e06893d..3fc489b 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -3,7 +3,7 @@ use crate::error::{ReceiveError, RequestError, RespondError, SendError}; use tokio::sync::{mpsc, oneshot}; use tokio::time::{timeout, Duration}; -use futures::Stream; +use futures_core::Stream; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/src/error.rs b/src/error.rs index d55e747..c96c49e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,6 @@ use std::error::Error; use std::fmt; -use tokio::sync::mpsc::error::{RecvError as MpscRecvError, SendError as MpscSendError}; +use tokio::sync::mpsc::error::SendError as MpscSendError; use tokio::sync::oneshot; /// Error thrown when a [`RequestSender::send()`](crate::RequestSender::send()) or [`UnboundedRequestSender::send()`](crate::unbounded::UnboundedRequestSender::send()) @@ -51,13 +51,6 @@ pub enum ReceiveError { TimeoutError, } -// Cannot test this due to private field in the tokio error implementation -#[cfg(not(tarpaulin_include))] -impl From for RequestError { - fn from(_err: MpscRecvError) -> RequestError { - RequestError::RecvError - } -} impl From> for RequestError { fn from(err: SendError) -> RequestError { RequestError::SendError(err.0) @@ -104,10 +97,14 @@ impl Error for RequestError where T: fmt::Debug {} impl fmt::Display for ReceiveError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "{}", match self { - ReceiveError::RecvError => "receive channel closed", - ReceiveError::TimeoutError => "request timed out" - }) + write!( + fmt, + "{}", + match self { + ReceiveError::RecvError => "receive channel closed", + ReceiveError::TimeoutError => "request timed out", + } + ) } } diff --git a/src/unbounded.rs b/src/unbounded.rs index 62727cb..b66794a 100644 --- a/src/unbounded.rs +++ b/src/unbounded.rs @@ -4,7 +4,7 @@ use crate::bounded::ResponseReceiver; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; -use futures::Stream; +use futures_core::Stream; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tests/loom_bounded.rs b/tests/loom_bounded.rs index ceacf06..58f6041 100644 --- a/tests/loom_bounded.rs +++ b/tests/loom_bounded.rs @@ -1,7 +1,5 @@ use loom::future::block_on; use loom::thread; -use tokio_test::assert_err; -use tokio_test::assert_ok; #[test] #[cfg(not(tarpaulin))] @@ -11,15 +9,15 @@ fn closing_tx() { thread::spawn(move || { let res = block_on(tx.send(4)); - assert_ok!(res); + assert!(res.is_ok()); drop(tx); }); let v = block_on(rx.recv()); - assert_ok!(v); + assert!(v.is_ok()); let v = block_on(rx.recv()); - assert_err!(v); + assert!(v.is_err()); }) } @@ -32,17 +30,16 @@ fn closing_tx_res() { thread::spawn(move || { let res = block_on(tx.send(5)); let repl = block_on(res.unwrap().recv()); - assert_ok!(repl); - assert_eq!(repl.unwrap(), 10); + assert_eq!(repl, Ok(10)); drop(tx); }); let v = block_on(rx.recv()); let (req, responder) = v.unwrap(); let v = responder.respond(req * 2); - assert_ok!(v); + assert!(v.is_ok()); let v = block_on(rx.recv()); - assert_err!(v); + assert!(v.is_err()); }) } diff --git a/tests/tests.rs b/tests/tests.rs index 18b79f5..d1fce26 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1,23 +1,21 @@ use bmrng::unbounded::UnboundedRequestReceiverStream; use bmrng::{error::*, RequestReceiverStream}; -use futures::StreamExt; +use futures_util::stream::StreamExt; use tokio::time::{advance, pause, resume, sleep, Duration}; -use tokio_test::{assert_err, assert_ok}; #[tokio::test] async fn unbounded_send_receive() { let (tx, mut rx) = bmrng::unbounded_channel::(); tokio::spawn(async move { let (input, responder) = rx.recv().await.expect("Unexpected err"); - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); let res = responder.respond(input * input); - assert_ok!(res); + assert!(res.is_ok()); }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); let response = tx.send_receive(8).await; - assert_eq!(tx.is_closed(), true); - assert_eq!(response.is_ok(), true); - assert_eq!(response.unwrap(), 64); + assert!(tx.is_closed()); + assert_eq!(response, Ok(64)); } #[tokio::test] @@ -25,15 +23,14 @@ async fn bounded_send_receive() { let (tx, mut rx) = bmrng::channel::(1); tokio::spawn(async move { let (input, responder) = rx.recv().await.expect("Unexpected err"); - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); let res = responder.respond(input * input); - assert_ok!(res); + assert!(res.is_ok()); }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); let response = tx.send_receive(8).await; - assert_eq!(tx.is_closed(), true); - assert_eq!(response.is_ok(), true); - assert_eq!(response.unwrap(), 64); + assert!(tx.is_closed()); + assert_eq!(response, Ok(64)); } #[tokio::test] @@ -46,13 +43,13 @@ async fn unbounded_request_sender_clone() { }); tokio::spawn(async move { while let Ok((input, responder)) = rx.recv().await { - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); let res = responder.respond(input * input); - assert_ok!(res); + assert!(res.is_ok()); } }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert_eq!(response, Ok(64)); } @@ -67,13 +64,13 @@ async fn bounded_request_sender_clone() { }); tokio::spawn(async move { while let Ok((input, responder)) = rx.recv().await { - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); let res = responder.respond(input * input); - assert_ok!(res); + assert!(res.is_ok()); } }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert_eq!(response, Ok(64)); } @@ -86,7 +83,7 @@ async fn unbounded_drop_while_waiting_for_response() { drop(responder); }); let response = tx.send_receive(8).await; - assert_ok!(tokio::join!(task).0); + assert!(tokio::join!(task).0.is_ok()); assert_eq!(response, Err(RequestError::RecvError)); } @@ -99,7 +96,7 @@ async fn unbounded_drop_while_waiting_for_request() { }; }); drop(tx); - assert_ok!(tokio::join!(task).0); + assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] @@ -112,7 +109,7 @@ async fn unbounded_drop_sender_while_sending_response() { }); let response_receiver = tx.send(21); drop(response_receiver); - assert_ok!(tokio::join!(task).0); + assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] @@ -123,7 +120,7 @@ async fn bounded_drop_while_waiting_for_response() { drop(responder); }); let response = tx.send_receive(8).await; - assert_ok!(tokio::join!(task).0); + assert!(tokio::join!(task).0.is_ok()); assert_eq!(response, Err(RequestError::RecvError)); } @@ -134,7 +131,7 @@ async fn bounded_drop_while_waiting_for_request() { rx.recv().await.expect_err("this should not be ok"); }); drop(tx); - assert_ok!(tokio::join!(task).0); + assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] @@ -147,7 +144,7 @@ async fn bounded_drop_sender_while_sending_response() { }); let response_receiver = tx.send(21).await; drop(response_receiver); - assert_ok!(tokio::join!(task).0); + assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] @@ -156,14 +153,14 @@ async fn bounded_close_request_receiver() { let task = tokio::spawn(async move { rx.close(); let (input, responder) = rx.recv().await.unwrap(); - assert_ok!(responder.respond(input * 2)); + assert!(responder.respond(input * 2).is_ok()); }); let mut response_receiver = tx.send(21).await.unwrap(); let response = response_receiver.recv().await; assert_eq!(response, Ok(42)); drop(response_receiver); - assert_err!(tx.send(1).await); - assert_ok!(tokio::join!(task).0); + assert!(tx.send(1).await.is_err()); + assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] @@ -172,14 +169,14 @@ async fn unbounded_close_request_receiver() { let task = tokio::spawn(async move { rx.close(); let (input, responder) = rx.recv().await.unwrap(); - assert_ok!(responder.respond(input * 2)); + assert!(responder.respond(input * 2).is_ok()); }); let mut response_receiver = tx.send(21).unwrap(); let response = response_receiver.recv().await; assert_eq!(response, Ok(42)); drop(response_receiver); - assert_err!(tx.send(1)); - assert_ok!(tokio::join!(task).0); + assert!(tx.send(1).is_err()); + assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] @@ -188,13 +185,13 @@ async fn bounded_timeout() { pause(); tokio::spawn(async move { let (_input, responder) = rx.recv().await.expect("Unexpected err"); - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); advance(Duration::from_millis(200)).await; sleep(Duration::from_micros(1)).await; resume(); panic!("Should have timed out"); }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert_eq!(response, Err(RequestError::::RecvTimeoutError)); } @@ -206,13 +203,13 @@ async fn unbounded_timeout() { pause(); tokio::spawn(async move { let (_input, responder) = rx.recv().await.expect("Unexpected err"); - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); advance(Duration::from_millis(200)).await; sleep(Duration::from_micros(1)).await; resume(); panic!("Should have timed out"); }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert_eq!(response, Err(RequestError::::RecvTimeoutError)); } @@ -225,16 +222,16 @@ async fn bounded_stream() { while let Some((input, responder)) = stream.next().await { assert_eq!(responder.is_closed(), false); let res = responder.respond(input * input); - assert_ok!(res); + assert!(res.is_ok()); } }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); assert_eq!(tx.send_receive(8).await, Ok(64)); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); assert_eq!(tx.send_receive(3).await, Ok(9)); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); assert_eq!(tx.send_receive(1).await, Ok(1)); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); } #[tokio::test] @@ -243,18 +240,18 @@ async fn unbounded_stream() { tokio::spawn(async move { let mut stream = rx.into_stream(); while let Some((input, responder)) = stream.next().await { - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); let res = responder.respond(input * input); - assert_ok!(res); + assert!(res.is_ok()); } }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); assert_eq!(tx.send_receive(8).await, Ok(64)); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); assert_eq!(tx.send_receive(3).await, Ok(9)); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); assert_eq!(tx.send_receive(1).await, Ok(1)); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); } #[tokio::test] @@ -264,15 +261,14 @@ async fn req_receiver_into_inner() { let mut rx = stream.into_inner(); tokio::spawn(async move { let (input, responder) = rx.recv().await.expect("Unexpected err"); - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); let res = responder.respond(input * input); - assert_ok!(res); + assert!(res.is_ok()); }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); let response = tx.send_receive(8).await; - assert_eq!(tx.is_closed(), true); - assert_eq!(response.is_ok(), true); - assert_eq!(response.unwrap(), 64); + assert!(tx.is_closed()); + assert_eq!(response, Ok(64)); } #[tokio::test] @@ -282,13 +278,12 @@ async fn req_unbounded_receiver_into_inner() { let mut rx = stream.into_inner(); tokio::spawn(async move { let (input, responder) = rx.recv().await.expect("Unexpected err"); - assert_eq!(responder.is_closed(), false); + assert!(!responder.is_closed()); let res = responder.respond(input * input); - assert_ok!(res); + assert!(res.is_ok()); }); - assert_eq!(tx.is_closed(), false); + assert!(!tx.is_closed()); let response = tx.send_receive(8).await; - assert_eq!(tx.is_closed(), true); - assert_eq!(response.is_ok(), true); - assert_eq!(response.unwrap(), 64); + assert!(tx.is_closed()); + assert_eq!(response, Ok(64)); }