Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: move broadcast error types into broadcast::error module #2937

Merged
merged 4 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
157 changes: 89 additions & 68 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
//! [`Receiver`]: crate::sync::broadcast::Receiver
//! [`channel`]: crate::sync::broadcast::channel
//! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged
//! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed
//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
//! [`recv`]: crate::sync::broadcast::Receiver::recv
//!
//! # Examples
Expand Down Expand Up @@ -197,53 +197,97 @@ pub struct Receiver<T> {
next: u64,
}

/// Error returned by [`Sender::send`][Sender::send].
///
/// A **send** operation can only fail if there are no active receivers,
/// implying that the message could never be received. The error contains the
/// message being sent as a payload so it can be recovered.
#[derive(Debug)]
pub struct SendError<T>(pub T);
pub mod error {
//! Broadcast error types

/// An error returned from the [`recv`] function on a [`Receiver`].
///
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug, PartialEq)]
pub enum RecvError {
/// There are no more active senders implying no further messages will ever
/// be sent.
Closed,
use std::fmt;

/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.
/// Error returned by from the [`send`] function on a [`Sender`].
///
/// Includes the number of skipped messages.
Lagged(u64),
}
/// A **send** operation can only fail if there are no active receivers,
/// implying that the message could never be received. The error contains the
/// message being sent as a payload so it can be recovered.
///
/// [`send`]: crate::sync::broadcast::Sender::send
/// [`Sender`]: crate::sync::broadcast::Sender
#[derive(Debug)]
pub struct SendError<T>(pub T);

/// An error returned from the [`try_recv`] function on a [`Receiver`].
///
/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug, PartialEq)]
pub enum TryRecvError {
/// The channel is currently empty. There are still active
/// [`Sender`][Sender] handles, so data may yet become available.
Empty,

/// There are no more active senders implying no further messages will ever
/// be sent.
Closed,

/// The receiver lagged too far behind and has been forcibly disconnected.
/// Attempting to receive again will return the oldest message still
/// retained by the channel.
///
/// Includes the number of skipped messages.
Lagged(u64),
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "channel closed")
}
}

impl<T: fmt::Debug> std::error::Error for SendError<T> {}

/// An error returned from the [`recv`] function on a [`Receiver`].
///
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug, PartialEq)]
pub enum RecvError {
/// There are no more active senders implying no further messages will ever
/// be sent.
Closed,

/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.
///
/// Includes the number of skipped messages.
Lagged(u64),
}

impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RecvError::Closed => write!(f, "channel closed"),
RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
}
}
}

impl std::error::Error for RecvError {}

/// An error returned from the [`try_recv`] function on a [`Receiver`].
///
/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug, PartialEq)]
pub enum TryRecvError {
/// The channel is currently empty. There are still active
/// [`Sender`] handles, so data may yet become available.
///
/// [`Sender`]: crate::sync::broadcast::Sender
Empty,

/// There are no more active senders implying no further messages will ever
/// be sent.
Closed,

/// The receiver lagged too far behind and has been forcibly disconnected.
/// Attempting to receive again will return the oldest message still
/// retained by the channel.
///
/// Includes the number of skipped messages.
Lagged(u64),
}

impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TryRecvError::Empty => write!(f, "channel empty"),
TryRecvError::Closed => write!(f, "channel closed"),
TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
}
}
}

impl std::error::Error for TryRecvError {}
}

use self::error::*;

/// Data shared between senders and receivers
struct Shared<T> {
/// slots in the channel
Expand Down Expand Up @@ -371,8 +415,8 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
/// [`Receiver`]: crate::sync::broadcast::Receiver
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`SendError`]: crate::sync::broadcast::SendError
/// [`RecvError`]: crate::sync::broadcast::RecvError
/// [`SendError`]: crate::sync::broadcast::error::SendError
/// [`RecvError`]: crate::sync::broadcast::error::RecvError
///
/// # Examples
///
Expand Down Expand Up @@ -1112,27 +1156,4 @@ impl<'a, T> Drop for RecvGuard<'a, T> {
}
}

impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RecvError::Closed => write!(f, "channel closed"),
RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
}
}
}

impl std::error::Error for RecvError {}

impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TryRecvError::Empty => write!(f, "channel empty"),
TryRecvError::Closed => write!(f, "channel closed"),
TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
}
}
}

impl std::error::Error for TryRecvError {}

fn is_unpin<T: Unpin>() {}
2 changes: 1 addition & 1 deletion tokio/src/sync/tests/loom_broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::sync::broadcast;
use crate::sync::broadcast::RecvError::{Closed, Lagged};
use crate::sync::broadcast::error::RecvError::{Closed, Lagged};

use loom::future::block_on;
use loom::sync::Arc;
Expand Down
10 changes: 5 additions & 5 deletions tokio/tests/sync_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ macro_rules! assert_empty {
($e:expr) => {
match $e.try_recv() {
Ok(value) => panic!("expected empty; got = {:?}", value),
Err(broadcast::TryRecvError::Empty) => {}
Err(broadcast::error::TryRecvError::Empty) => {}
Err(e) => panic!("expected empty; got = {:?}", e),
}
};
Expand All @@ -32,7 +32,7 @@ macro_rules! assert_empty {
macro_rules! assert_lagged {
($e:expr, $n:expr) => {
match assert_err!($e) {
broadcast::TryRecvError::Lagged(n) => {
broadcast::error::TryRecvError::Lagged(n) => {
assert_eq!(n, $n);
}
_ => panic!("did not lag"),
Expand All @@ -43,7 +43,7 @@ macro_rules! assert_lagged {
macro_rules! assert_closed {
($e:expr) => {
match assert_err!($e) {
broadcast::TryRecvError::Closed => {}
broadcast::error::TryRecvError::Closed => {}
_ => panic!("did not lag"),
}
};
Expand Down Expand Up @@ -491,6 +491,6 @@ fn lagging_receiver_recovers_after_wrap_open() {
assert_empty!(rx);
}

fn is_closed(err: broadcast::RecvError) -> bool {
matches!(err, broadcast::RecvError::Closed)
fn is_closed(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Closed)
}