diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d55dfba..8cbcffc 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,12 +4,10 @@ on: push: branches: - main - - staging - - trying - - "renovate/**" tags: - "*" pull_request: + merge_group: jobs: build: @@ -17,12 +15,12 @@ jobs: strategy: matrix: rust: [stable, beta, nightly] - zookeeper: [3.8.1, 3.7.1, 3.6.4, 3.5.10] + zookeeper: [3.9.2, 3.8.4, 3.7.2, 3.6.4, 3.5.10] steps: - name: Check out repository code uses: actions/checkout@v3 - run: rustup default ${{ matrix.rust }} - - run: wget -O zookeeper.tar.gz https://dlcdn.apache.org/zookeeper/zookeeper-${{ matrix.zookeeper }}/apache-zookeeper-${{ matrix.zookeeper }}-bin.tar.gz + - run: wget -O zookeeper.tar.gz https://archive.apache.org/dist/zookeeper/zookeeper-${{ matrix.zookeeper }}/apache-zookeeper-${{ matrix.zookeeper }}-bin.tar.gz - run: mkdir zookeeper - run: tar -zxvf zookeeper.tar.gz -C zookeeper --strip-components 1 - run: ./scripts/ci-start-zookeeper diff --git a/CHANGELOG.md b/CHANGELOG.md index d19f00f..1224bcf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- [BREAKING] Migrated errors from Failure to SNAFU ([#39]). +- Updated ZooKeeper versions we test against (now 3.9.2, 3.8.4, 3.7.2, 3.6.4, 3.5.10) ([#39]). + +[#39]: https://github.com/stackabletech/tokio-zookeeper/pull/39 + ## [0.2.1] - 2023-02-13 ### Changed diff --git a/Cargo.toml b/Cargo.toml index 8b5c1b7..7f511ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,12 +23,12 @@ maintenance = { status = "experimental" } [dependencies] futures = "0.3" tokio = { version = "1.22", features = ["net", "rt", "time"] } -failure = "0.1" byteorder = "1.2" slog = "2.3.2" async-trait = "0.1.58" pin-project = "1.0.12" once_cell = "1.17.0" +snafu = "0.8.2" #slog = { version = "2.3.2", features = ['max_level_trace'] } [dev-dependencies] diff --git a/src/error.rs b/src/error.rs index 446d6e1..fa65490 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,39 +1,35 @@ -use failure::Fail; +use snafu::Snafu; /// Errors that may cause a delete request to fail. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Fail)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Snafu)] +#[snafu(module)] pub enum Delete { /// No node exists with the given `path`. - #[fail(display = "target node does not exist")] + #[snafu(display("target node does not exist"))] NoNode, /// The target node has a different version than was specified by the call to delete. - #[fail( - display = "target node has different version than expected ({})", - expected - )] + #[snafu(display("target node has different version than expected ({expected})"))] BadVersion { /// The expected node version. expected: i32, }, /// The target node has child nodes, and therefore cannot be deleted. - #[fail(display = "target node has children, and cannot be deleted")] + #[snafu(display("target node has children, and cannot be deleted"))] NotEmpty, } /// Errors that may cause a `set_data` request to fail. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Fail)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Snafu)] +#[snafu(module)] pub enum SetData { /// No node exists with the given `path`. - #[fail(display = "target node does not exist")] + #[snafu(display("target node does not exist"))] NoNode, /// The target node has a different version than was specified by the call to `set_data`. - #[fail( - display = "target node has different version than expected ({})", - expected - )] + #[snafu(display("target node has different version than expected ({expected})"))] BadVersion { /// The expected node version. expected: i32, @@ -41,77 +37,75 @@ pub enum SetData { /// The target node's permission does not accept data modification or requires different /// authentication to be altered. - #[fail(display = "insuficient authentication")] + #[snafu(display("insuficient authentication"))] NoAuth, } /// Errors that may cause a create request to fail. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Fail)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Snafu)] +#[snafu(module)] pub enum Create { /// A node with the given `path` already exists. - #[fail(display = "target node already exists")] + #[snafu(display("target node already exists"))] NodeExists, /// The parent node of the given `path` does not exist. - #[fail(display = "parent node of target does not exist")] + #[snafu(display("parent node of target does not exist"))] NoNode, /// The parent node of the given `path` is ephemeral, and cannot have children. - #[fail(display = "parent node is ephemeral, and cannot have children")] + #[snafu(display("parent node is ephemeral, and cannot have children"))] NoChildrenForEphemerals, /// The given ACL is invalid. - #[fail(display = "the given ACL is invalid")] + #[snafu(display("the given ACL is invalid"))] InvalidAcl, } /// Errors that may cause a `get_acl` request to fail. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Fail)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Snafu)] +#[snafu(module)] pub enum GetAcl { /// No node exists with the given `path`. - #[fail(display = "target node does not exist")] + #[snafu(display("target node does not exist"))] NoNode, } /// Errors that may cause a `set_acl` request to fail. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Fail)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Snafu)] +#[snafu(module)] pub enum SetAcl { /// No node exists with the given `path`. - #[fail(display = "target node does not exist")] + #[snafu(display("target node does not exist"))] NoNode, /// The target node has a different version than was specified by the call to `set_acl`. - #[fail( - display = "target node has different version than expected ({})", - expected - )] + #[snafu(display("target node has different version than expected ({expected})"))] BadVersion { /// The expected node version. expected: i32, }, /// The given ACL is invalid. - #[fail(display = "the given ACL is invalid")] + #[snafu(display("the given ACL is invalid"))] InvalidAcl, /// The target node's permission does not accept acl modification or requires different /// authentication to be altered. - #[fail(display = "insufficient authentication")] + #[snafu(display("insufficient authentication"))] NoAuth, } /// Errors that may cause a `check` request to fail. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Fail)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Snafu)] +#[snafu(module)] pub enum Check { /// No node exists with the given `path`. - #[fail(display = "target node does not exist")] + #[snafu(display("target node does not exist"))] NoNode, /// The target node has a different version than was specified by the call to `check`. - #[fail( - display = "target node has different version than expected ({})", - expected - )] + #[snafu(display("target node has different version than expected ({expected})"))] BadVersion { /// The expected node version. expected: i32, @@ -119,55 +113,43 @@ pub enum Check { } /// The result of a failed `multi` request. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Fail)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Snafu)] pub enum Multi { /// A failed `delete` request. - #[fail(display = "delete failed: {}", 0)] - Delete(Delete), + #[snafu(display("delete failed"), context(false))] + Delete { + /// The source error. + source: Delete, + }, /// A failed `set_data` request. - #[fail(display = "set_data failed: {}", 0)] - SetData(SetData), + #[snafu(display("set_data failed"), context(false))] + SetData { + /// The source error. + source: SetData, + }, /// A failed `create` request. - #[fail(display = "create failed: {}", 0)] - Create(Create), + #[snafu(display("create failed"), context(false))] + Create { + /// The source error. + source: Create, + }, /// A failed `check` request. - #[fail(display = "check failed")] - Check(Check), + #[snafu(display("check failed"), context(false))] + Check { + /// The source error. + source: Check, + }, /// The request would have succeeded, but a later request in the `multi` /// batch failed and caused this request to get rolled back. - #[fail(display = "request rolled back due to later failed request")] + #[snafu(display("request rolled back due to later failed request"))] RolledBack, /// The request was skipped because an earlier request in the `multi` batch /// failed. It is unknown whether this request would have succeeded. - #[fail(display = "request failed due to earlier failed request")] + #[snafu(display("request failed due to earlier failed request"))] Skipped, } - -impl From for Multi { - fn from(err: Delete) -> Self { - Multi::Delete(err) - } -} - -impl From for Multi { - fn from(err: SetData) -> Self { - Multi::SetData(err) - } -} - -impl From for Multi { - fn from(err: Create) -> Self { - Multi::Create(err) - } -} - -impl From for Multi { - fn from(err: Check) -> Self { - Multi::Check(err) - } -} diff --git a/src/lib.rs b/src/lib.rs index 84d7080..4d1a8c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,9 +192,9 @@ #![deny(missing_debug_implementations)] #![deny(missing_copy_implementations)] -use failure::{bail, format_err}; -use futures::{channel::oneshot, Stream, TryFutureExt}; +use futures::{channel::oneshot, Stream}; use slog::{debug, o, trace}; +use snafu::{whatever as bail, ResultExt as _, Whatever}; use std::borrow::Cow; use std::net::SocketAddr; use std::time; @@ -210,6 +210,13 @@ pub use crate::types::{ Acl, CreateMode, KeeperState, MultiResponse, Permission, Stat, WatchedEvent, WatchedEventType, }; +macro_rules! format_err { + ($($x:tt)*) => { + ::without_source(format!($($x)*)) + }; +} +pub(crate) use format_err; + /// A connection to ZooKeeper. /// /// All interactions with ZooKeeper are performed by calling the methods of a `ZooKeeper` instance. @@ -271,13 +278,12 @@ impl ZooKeeperBuilder { pub async fn connect( self, addr: &SocketAddr, - ) -> Result<(ZooKeeper, impl Stream), failure::Error> { + ) -> Result<(ZooKeeper, impl Stream), Whatever> { let (tx, rx) = futures::channel::mpsc::unbounded(); - tokio::net::TcpStream::connect(addr) - .map_err(failure::Error::from) - .and_then(move |stream| self.handshake(*addr, stream, tx)) - .map_ok(move |zk| (zk, rx)) + let stream = tokio::net::TcpStream::connect(addr) .await + .whatever_context("connect failed")?; + Ok((self.handshake(*addr, stream, tx).await?, rx)) } /// Set the ZooKeeper [session expiry @@ -301,7 +307,7 @@ impl ZooKeeperBuilder { addr: SocketAddr, stream: tokio::net::TcpStream, default_watcher: futures::channel::mpsc::UnboundedSender, - ) -> Result { + ) -> Result { let request = proto::Request::Connect { protocol_version: 0, last_zxid_seen: 0, @@ -331,7 +337,7 @@ impl ZooKeeper { /// See [`ZooKeeperBuilder::connect`]. pub async fn connect( addr: &SocketAddr, - ) -> Result<(Self, impl Stream), failure::Error> { + ) -> Result<(Self, impl Stream), Whatever> { ZooKeeperBuilder::default().connect(addr).await } @@ -370,7 +376,7 @@ impl ZooKeeper { data: D, acl: A, mode: CreateMode, - ) -> Result, failure::Error> + ) -> Result, Whatever> where D: Into>, A: Into>, @@ -403,7 +409,7 @@ impl ZooKeeper { path: &str, version: Option, data: D, - ) -> Result, failure::Error> + ) -> Result, Whatever> where D: Into>, { @@ -432,7 +438,7 @@ impl ZooKeeper { &self, path: &str, version: Option, - ) -> Result, failure::Error> { + ) -> Result, Whatever> { trace!(self.logger, "delete"; "path" => path, "version" => ?version); let version = version.unwrap_or(-1); self.connection @@ -452,7 +458,7 @@ impl ZooKeeper { pub async fn get_acl( &self, path: &str, - ) -> Result, Stat), error::GetAcl>, failure::Error> { + ) -> Result, Stat), error::GetAcl>, Whatever> { trace!(self.logger, "get_acl"; "path" => path); self.connection .enqueue(proto::Request::GetAcl { @@ -476,7 +482,7 @@ impl ZooKeeper { path: &str, acl: A, version: Option, - ) -> Result, failure::Error> + ) -> Result, Whatever> where A: Into>, { @@ -505,7 +511,7 @@ impl ZooKeeper { WithWatcher(self) } - async fn exists_w(&self, path: &str, watch: Watch) -> Result, failure::Error> { + async fn exists_w(&self, path: &str, watch: Watch) -> Result, Whatever> { trace!(self.logger, "exists"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::Exists { @@ -517,7 +523,7 @@ impl ZooKeeper { } /// Return the [`Stat`] of the node of the given `path`, or `None` if the node does not exist. - pub async fn exists(&self, path: &str) -> Result, failure::Error> { + pub async fn exists(&self, path: &str) -> Result, Whatever> { self.exists_w(path, Watch::None).await } @@ -525,7 +531,7 @@ impl ZooKeeper { &self, path: &str, watch: Watch, - ) -> Result>, failure::Error> { + ) -> Result>, Whatever> { trace!(self.logger, "get_children"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::GetChildren { @@ -541,7 +547,7 @@ impl ZooKeeper { /// /// The returned list of children is not sorted and no guarantee is provided as to its natural /// or lexical order. - pub async fn get_children(&self, path: &str) -> Result>, failure::Error> { + pub async fn get_children(&self, path: &str) -> Result>, Whatever> { self.get_children_w(path, Watch::None).await } @@ -549,7 +555,7 @@ impl ZooKeeper { &self, path: &str, watch: Watch, - ) -> Result, Stat)>, failure::Error> { + ) -> Result, Stat)>, Whatever> { trace!(self.logger, "get_data"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::GetData { @@ -562,7 +568,7 @@ impl ZooKeeper { /// Return the data and the [`Stat`] of the node at the given `path`, or `None` if it does not /// exist. - pub async fn get_data(&self, path: &str) -> Result, Stat)>, failure::Error> { + pub async fn get_data(&self, path: &str) -> Result, Stat)>, Whatever> { self.get_data_w(path, Watch::None).await } @@ -588,7 +594,7 @@ impl<'a> WatchGlobally<'a> { /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered /// by any successful operation that creates or deletes the node, or sets the node's data. When /// the watch triggers, an event is sent to the global watcher stream. - pub async fn exists(&self, path: &str) -> Result, failure::Error> { + pub async fn exists(&self, path: &str) -> Result, Whatever> { self.0.exists_w(path, Watch::Global).await } @@ -602,7 +608,7 @@ impl<'a> WatchGlobally<'a> { /// by any successful operation that deletes the node at the given `path`, or creates or /// deletes a child of that node. When the watch triggers, an event is sent to the global /// watcher stream. - pub async fn get_children(&self, path: &str) -> Result>, failure::Error> { + pub async fn get_children(&self, path: &str) -> Result>, Whatever> { self.0.get_children_w(path, Watch::Global).await } @@ -612,7 +618,7 @@ impl<'a> WatchGlobally<'a> { /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered /// by any successful operation that sets the node's data, or deletes it. When the watch /// triggers, an event is sent to the global watcher stream. - pub async fn get_data(&self, path: &str) -> Result, Stat)>, failure::Error> { + pub async fn get_data(&self, path: &str) -> Result, Stat)>, Whatever> { self.0.get_data_w(path, Watch::Global).await } } @@ -633,7 +639,7 @@ impl<'a> WithWatcher<'a> { pub async fn exists( &self, path: &str, - ) -> Result<(oneshot::Receiver, Option), failure::Error> { + ) -> Result<(oneshot::Receiver, Option), Whatever> { let (tx, rx) = oneshot::channel(); self.0 .exists_w(path, Watch::Custom(tx)) @@ -654,7 +660,7 @@ impl<'a> WithWatcher<'a> { pub async fn get_children( &self, path: &str, - ) -> Result, Vec)>, failure::Error> { + ) -> Result, Vec)>, Whatever> { let (tx, rx) = oneshot::channel(); self.0 .get_children_w(path, Watch::Custom(tx)) @@ -671,7 +677,7 @@ impl<'a> WithWatcher<'a> { pub async fn get_data( &self, path: &str, - ) -> Result, Vec, Stat)>, failure::Error> { + ) -> Result, Vec, Stat)>, Whatever> { let (tx, rx) = oneshot::channel(); self.0 .get_data_w(path, Watch::Custom(tx)) @@ -744,7 +750,7 @@ impl<'a> MultiBuilder<'a> { } /// Run executes the attached requests in one atomic unit. - pub async fn run(self) -> Result>, failure::Error> { + pub async fn run(self) -> Result>, Whatever> { let (zk, requests) = (self.zk, self.requests); let reqs_lite: Vec = requests.iter().map(|r| r.into()).collect(); zk.connection @@ -769,6 +775,7 @@ mod tests { use futures::StreamExt; use slog::Drain; + use snafu::Whatever; #[tokio::test] async fn it_works() { @@ -796,10 +803,7 @@ mod tests { .await .unwrap(); assert_eq!(path.as_ref().map(String::as_str), Ok("/foo")); - let event = exists_w - .map_err(|e| format_err!("exists_w failed: {:?}", e)) - .await - .unwrap(); + let event = exists_w.await.expect("exists_w failed"); assert_eq!( event, WatchedEvent { @@ -1036,13 +1040,13 @@ mod tests { let drain = slog_async::Async::new(drain).build().fuse(); builder.set_logger(slog::Logger::root(drain, o!())); - async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result, failure::Error> { + async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result, Whatever> { let mut res = Vec::new(); for p in paths { let exists = zk.exists(p).await?; res.push(exists.is_some()); } - Result::<_, failure::Error>::Ok(res) + Result::<_, Whatever>::Ok(res) } let (zk, _) = builder @@ -1081,7 +1085,9 @@ mod tests { res, &[ Err(error::Multi::RolledBack), - Err(error::Multi::Create(error::Create::NodeExists)), + Err(error::Multi::Create { + source: error::Create::NodeExists + }), Err(error::Multi::Skipped), Err(error::Multi::Skipped), ] @@ -1113,9 +1119,9 @@ mod tests { assert_eq!( res, [ - Err(error::Multi::Check(error::Check::BadVersion { - expected: 0 - })), + Err(error::Multi::Check { + source: error::Check::BadVersion { expected: 0 } + }), Err(error::Multi::Skipped), ] ); @@ -1123,7 +1129,12 @@ mod tests { let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap(); assert_eq!(res, &[false, true, true, false]); let res = zk.multi().check("/a", 0).run().await.unwrap(); - assert_eq!(res, &[Err(error::Multi::Check(error::Check::NoNode)),]); + assert_eq!( + res, + &[Err(error::Multi::Check { + source: error::Check::NoNode + }),] + ); let res = zk .multi() diff --git a/src/proto/active_packetizer.rs b/src/proto/active_packetizer.rs index 7116f8b..dde2891 100644 --- a/src/proto/active_packetizer.rs +++ b/src/proto/active_packetizer.rs @@ -1,13 +1,13 @@ use super::{request, watch::WatchType, Request, Response}; use crate::{WatchedEvent, WatchedEventType, ZkError}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use failure::format_err; use futures::{ channel::{mpsc, oneshot}, ready, }; use pin_project::pin_project; use slog::{debug, info, trace}; +use snafu::{Snafu, Whatever}; use std::collections::HashMap; use std::{ future::Future, @@ -18,6 +18,27 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(transparent)] + Io { source: std::io::Error }, + #[snafu(transparent)] + Whatever { source: Whatever }, + + #[snafu(display("connection closed with {len} bytes left in buffer: {buf:x?}", len = buf.len()))] + ConnectionClosed { buf: Vec }, + + #[snafu(display("Not exiting, but server closed connection"))] + ServerClosedConnection, + #[snafu(display("outstanding requests, but response channel closed"))] + ResponseChannelClosedPrematurely, + + #[snafu(display("bad response to ping: {error_code:?}"))] + BadPing { error_code: ZkError }, + #[snafu(display("failed to close session: {error_code:?}"))] + CloseSession { error_code: ZkError }, +} + #[pin_project] pub(super) struct ActivePacketizer { #[pin] @@ -57,6 +78,8 @@ pub(super) struct ActivePacketizer { pub(super) password: Vec, } +type ReplySender = oneshot::Sender>; + impl ActivePacketizer where S: AsyncRead + AsyncWrite, @@ -103,7 +126,7 @@ where fn enqueue_impl( outbox: &mut Vec, - reply: &mut HashMap>)>, + reply: &mut HashMap, xid: i32, item: Request, tx: oneshot::Sender>, @@ -163,7 +186,7 @@ where cx: &mut Context, exiting: bool, logger: &mut slog::Logger, - ) -> Poll> + ) -> Poll> where S: AsyncWrite, { @@ -190,11 +213,7 @@ where .reset(tokio::time::Instant::now() + *this.timeout); } - ready!(this - .stream - .as_mut() - .poll_flush(cx) - .map_err(failure::Error::from)?); + ready!(this.stream.as_mut().poll_flush(cx)?); if exiting { debug!(logger, "shutting down writer"); @@ -209,7 +228,7 @@ where cx: &mut Context, default_watcher: &mut mpsc::UnboundedSender, logger: &mut slog::Logger, - ) -> Poll> + ) -> Poll> where S: AsyncRead, { @@ -233,11 +252,12 @@ where this.inbox.truncate(read_from + n); if n == 0 { if self.inlen() != 0 { - return Poll::Ready(Err(format_err!( - "connection closed with {} bytes left in buffer: {:x?}", - self.inlen(), - &self.inbox[self.instart..] - ))); + return Poll::Ready( + ConnectionClosedSnafu { + buf: &self.inbox[self.instart..], + } + .fail(), + ); } else { // Server closed session with no bytes left in buffer debug!(logger, "server closed connection"); @@ -293,7 +313,7 @@ where // XXX: in theory, server should now shut down receive end trace!(logger, "got response to CloseSession"); if let Some(e) = err { - return Poll::Ready(Err(format_err!("failed to close session: {:?}", e))); + return Poll::Ready(CloseSessionSnafu { error_code: e }.fail()); } } else if xid == -1 { // watch event @@ -348,7 +368,7 @@ where // response to ping -- empty response trace!(logger, "got response to heartbeat"); if let Some(e) = err { - return Poll::Ready(Err(format_err!("bad response to ping: {:?}", e))); + return Poll::Ready(BadPingSnafu { error_code: e }.fail()); } } else { // response to user request @@ -364,10 +384,7 @@ where || (opcode == request::OpCode::Exists && err == Some(ZkError::NoNode)) { trace!(logger, "pending watcher turned into real watcher"; "xid" => xid); - this.watchers - .entry(w.0) - .or_insert_with(Vec::new) - .push((w.1, w.2)); + this.watchers.entry(w.0).or_default().push((w.1, w.2)); } else { trace!(logger, "pending watcher not turned into real watcher: {:?}", @@ -429,7 +446,7 @@ where exiting: bool, logger: &mut slog::Logger, default_watcher: &mut mpsc::UnboundedSender, - ) -> Poll> { + ) -> Poll> { trace!(logger, "poll_read"); let r = self.as_mut().poll_read(cx, default_watcher, logger)?; @@ -467,12 +484,8 @@ where debug!(logger, "packetizer done"); Poll::Ready(Ok(())) } - (Poll::Ready(()), Poll::Ready(())) => Poll::Ready(Err(format_err!( - "Not exiting, but server closed connection" - ))), - (Poll::Ready(()), _) => Poll::Ready(Err(format_err!( - "outstanding requests, but response channel closed" - ))), + (Poll::Ready(()), Poll::Ready(())) => Poll::Ready(ServerClosedConnectionSnafu.fail()), + (Poll::Ready(()), _) => Poll::Ready(ResponseChannelClosedPrematurelySnafu.fail()), _ => Poll::Pending, } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 657bc30..966e752 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use std::error::Error; use std::net::SocketAddr; use tokio::io::{AsyncRead, AsyncWrite}; @@ -18,7 +19,7 @@ pub(crate) use self::watch::Watch; #[async_trait] pub trait ZooKeeperTransport: AsyncRead + AsyncWrite + Sized + Send + 'static { type Addr: Send + Clone; - type ConnectError: Into + 'static; + type ConnectError: Error + 'static; async fn connect(addr: Self::Addr) -> Result; } diff --git a/src/proto/packetizer.rs b/src/proto/packetizer.rs index f9349dc..47f85cb 100644 --- a/src/proto/packetizer.rs +++ b/src/proto/packetizer.rs @@ -2,9 +2,8 @@ use super::{ active_packetizer::ActivePacketizer, request, watch::WatchType, Request, Response, ZooKeeperTransport, }; -use crate::{Watch, WatchedEvent, ZkError}; +use crate::{format_err, Watch, WatchedEvent, ZkError}; use byteorder::{BigEndian, WriteBytesExt}; -use failure::format_err; use futures::{ channel::{mpsc, oneshot}, future::Either, @@ -12,6 +11,7 @@ use futures::{ }; use pin_project::pin_project; use slog::{debug, error, info, trace}; +use snafu::{ResultExt, Whatever}; use std::{ future::{self, Future}, mem, @@ -88,7 +88,7 @@ where enum PacketizerState { Connected(#[pin] ActivePacketizer), Reconnecting( - Pin, failure::Error>> + Send + 'static>>, + Pin, Whatever>> + Send + 'static>>, ), } @@ -102,10 +102,13 @@ where exiting: bool, logger: &mut slog::Logger, default_watcher: &mut mpsc::UnboundedSender, - ) -> Poll> { + ) -> Poll> { let ap = match self.as_mut().project() { PacketizerStateProj::Connected(ref mut ap) => { - return ap.as_mut().poll(cx, exiting, logger, default_watcher) + return ap + .as_mut() + .poll(cx, exiting, logger, default_watcher) + .map(|res| res.whatever_context("active packetizer failed")) } PacketizerStateProj::Reconnecting(ref mut c) => ready!(c.as_mut().poll(cx)?), }; @@ -184,7 +187,7 @@ impl Future for Packetizer where S: ZooKeeperTransport, { - type Output = Result<(), failure::Error>; + type Output = Result<(), Whatever>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { trace!(self.logger, "packetizer polled"); @@ -260,29 +263,28 @@ where *this.xid += 1; let log = this.logger.clone(); - let retry = - S::connect(this.addr.clone()) - .map_err(|e| e.into()) - .map_ok(move |stream| { - let request = Request::Connect { - protocol_version: 0, - last_zxid_seen, - timeout: 0, - session_id, - passwd: password, - read_only: false, - }; - trace!(log, "about to handshake (again)"); + let retry = S::connect(this.addr.clone()) + .map(|res| res.whatever_context("failed to connect")) + .map_ok(move |stream| { + let request = Request::Connect { + protocol_version: 0, + last_zxid_seen, + timeout: 0, + session_id, + passwd: password, + read_only: false, + }; + trace!(log, "about to handshake (again)"); - let (tx, rx) = oneshot::channel(); - tokio::spawn(rx.map(move |r| { - trace!(log, "re-connection response: {:?}", r); - })); + let (tx, rx) = oneshot::channel(); + tokio::spawn(rx.map(move |r| { + trace!(log, "re-connection response: {:?}", r); + })); - let mut ap = ActivePacketizer::new(stream); - ap.enqueue_unpin(xid, request, tx); - ap - }); + let mut ap = ActivePacketizer::new(stream); + ap.enqueue_unpin(xid, request, tx); + ap + }); // dropping the old state will also cancel in-flight requests this.state @@ -305,7 +307,7 @@ impl Enqueuer { pub(crate) fn enqueue( &self, request: Request, - ) -> impl Future, failure::Error>> { + ) -> impl Future, Whatever>> { let (tx, rx) = oneshot::channel(); match self.0.unbounded_send((request, tx)) { Ok(()) => { diff --git a/src/proto/request.rs b/src/proto/request.rs index 5aa86c3..355764b 100644 --- a/src/proto/request.rs +++ b/src/proto/request.rs @@ -1,9 +1,9 @@ use super::Watch; use super::ZkError; +use crate::{Acl, CreateMode}; use byteorder::{BigEndian, WriteBytesExt}; use std::borrow::Cow; use std::io::{self, Write}; -use crate::{Acl, CreateMode}; #[derive(Debug)] pub(crate) enum Request { diff --git a/src/proto/response.rs b/src/proto/response.rs index 82948ff..f4d44a4 100644 --- a/src/proto/response.rs +++ b/src/proto/response.rs @@ -148,7 +148,7 @@ impl StringReader for R { } impl Response { - pub(super) fn parse(opcode: OpCode, reader: &mut &[u8]) -> Result { + pub(super) fn parse(opcode: OpCode, reader: &mut &[u8]) -> io::Result { match opcode { OpCode::CreateSession => Ok(Response::Connect { protocol_version: reader.read_i32::()?, diff --git a/src/transform.rs b/src/transform.rs index ae2708c..31c4024 100644 --- a/src/transform.rs +++ b/src/transform.rs @@ -1,11 +1,11 @@ -use failure::{bail, format_err}; +use snafu::{whatever as bail, Whatever}; use crate::proto::{Request, Response, ZkError}; use crate::{error, Acl, MultiResponse, Stat}; pub(crate) fn create( res: Result, -) -> Result, failure::Error> { +) -> Result, Whatever> { match res { Ok(Response::String(s)) => Ok(Ok(s)), Ok(r) => bail!("got non-string response to create: {:?}", r), @@ -13,14 +13,14 @@ pub(crate) fn create( Err(ZkError::NodeExists) => Ok(Err(error::Create::NodeExists)), Err(ZkError::InvalidACL) => Ok(Err(error::Create::InvalidAcl)), Err(ZkError::NoChildrenForEphemerals) => Ok(Err(error::Create::NoChildrenForEphemerals)), - Err(e) => Err(format_err!("create call failed: {:?}", e)), + Err(e) => bail!("create call failed: {:?}", e), } } pub(crate) fn set_data( version: i32, res: Result, -) -> Result, failure::Error> { +) -> Result, Whatever> { match res { Ok(Response::Stat(stat)) => Ok(Ok(stat)), Ok(r) => bail!("got a non-stat response to a set_data request: {:?}", r), @@ -34,32 +34,32 @@ pub(crate) fn set_data( pub(crate) fn delete( version: i32, res: Result, -) -> Result, failure::Error> { +) -> Result, Whatever> { match res { Ok(Response::Empty) => Ok(Ok(())), Ok(r) => bail!("got non-empty response to delete: {:?}", r), Err(ZkError::NoNode) => Ok(Err(error::Delete::NoNode)), Err(ZkError::NotEmpty) => Ok(Err(error::Delete::NotEmpty)), Err(ZkError::BadVersion) => Ok(Err(error::Delete::BadVersion { expected: version })), - Err(e) => Err(format_err!("delete call failed: {:?}", e)), + Err(e) => bail!("delete call failed: {:?}", e), } } pub(crate) fn get_acl( res: Result, -) -> Result, Stat), error::GetAcl>, failure::Error> { +) -> Result, Stat), error::GetAcl>, Whatever> { match res { Ok(Response::GetAcl { acl, stat }) => Ok(Ok((acl, stat))), Ok(r) => bail!("got non-acl response to a get_acl request: {:?}", r), Err(ZkError::NoNode) => Ok(Err(error::GetAcl::NoNode)), - Err(e) => Err(format_err!("get_acl call failed: {:?}", e)), + Err(e) => bail!("get_acl call failed: {:?}", e), } } pub(crate) fn set_acl( version: i32, res: Result, -) -> Result, failure::Error> { +) -> Result, Whatever> { match res { Ok(Response::Stat(stat)) => Ok(Ok(stat)), Ok(r) => bail!("got non-stat response to a set_acl request: {:?}", r), @@ -67,11 +67,11 @@ pub(crate) fn set_acl( Err(ZkError::BadVersion) => Ok(Err(error::SetAcl::BadVersion { expected: version })), Err(ZkError::InvalidACL) => Ok(Err(error::SetAcl::InvalidAcl)), Err(ZkError::NoAuth) => Ok(Err(error::SetAcl::NoAuth)), - Err(e) => Err(format_err!("set_acl call failed: {:?}", e)), + Err(e) => bail!("set_acl call failed: {:?}", e), } } -pub(crate) fn exists(res: Result) -> Result, failure::Error> { +pub(crate) fn exists(res: Result) -> Result, Whatever> { match res { Ok(Response::Stat(stat)) => Ok(Some(stat)), Ok(r) => bail!("got a non-create response to a create request: {:?}", r), @@ -82,30 +82,30 @@ pub(crate) fn exists(res: Result) -> Result, fai pub(crate) fn get_children( res: Result, -) -> Result>, failure::Error> { +) -> Result>, Whatever> { match res { Ok(Response::Strings(children)) => Ok(Some(children)), Ok(r) => bail!("got non-strings response to get-children: {:?}", r), Err(ZkError::NoNode) => Ok(None), - Err(e) => Err(format_err!("get-children call failed: {:?}", e)), + Err(e) => bail!("get-children call failed: {:?}", e), } } pub(crate) fn get_data( res: Result, -) -> Result, Stat)>, failure::Error> { +) -> Result, Stat)>, Whatever> { match res { Ok(Response::GetData { bytes, stat }) => Ok(Some((bytes, stat))), Ok(r) => bail!("got non-data response to get-data: {:?}", r), Err(ZkError::NoNode) => Ok(None), - Err(e) => Err(format_err!("get-data call failed: {:?}", e)), + Err(e) => bail!("get-data call failed: {:?}", e), } } pub(crate) fn check( version: i32, res: Result, -) -> Result, failure::Error> { +) -> Result, Whatever> { match res { Ok(Response::Empty) => Ok(Ok(())), Ok(r) => bail!("got a non-check response to a check request: {:?}", r), @@ -146,7 +146,7 @@ impl From<&Request> for RequestMarker { pub(crate) fn multi( req: &RequestMarker, res: Result, -) -> Result, failure::Error> { +) -> Result, Whatever> { // Handle multi-specific errors. match res { Err(ZkError::Ok) => return Ok(Err(error::Multi::RolledBack)),