diff --git a/CHANGELOG.md b/CHANGELOG.md index 1224bcf..6546018 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,11 @@ All notable changes to this project will be documented in this file. ### Changed - [BREAKING] Migrated errors from Failure to SNAFU ([#39]). +- [BREAKING] Migrated from `slog` to `tracing` ([#40]). - Updated ZooKeeper versions we test against (now 3.9.2, 3.8.4, 3.7.2, 3.6.4, 3.5.10) ([#39]). [#39]: https://github.com/stackabletech/tokio-zookeeper/pull/39 +[#40]: https://github.com/stackabletech/tokio-zookeeper/pull/40 ## [0.2.1] - 2023-02-13 @@ -35,7 +37,7 @@ now compatible with Rust's async/await syntax! [`WithTokio01Executor`](https://github.com/stackabletech/zookeeper-operator/blob/a682dcc3c7dc841917e968ba0e9fa9d33a4fabf5/rust/operator-binary/src/utils.rs#L6-L38)). 2. Upgrade tokio-zookeeper to v0.2. 3. Migrate async calls that thread the `ZooKeeper` instance to instead borrow it (for example, - `zk.exists(path).and_then(|(zk, stat)| /* do stuff */);` becomes + `zk.exists(path).and_then(|(zk, stat)| /* do stuff */);` becomes `let stat = zk.exists(path).await?;`). 4. Remove Tokio 0.1 and the compatibility wrapper if they are no longer required. diff --git a/Cargo.toml b/Cargo.toml index 7f511ca..7f77dee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,19 +21,17 @@ edition = "2021" maintenance = { status = "experimental" } [dependencies] -futures = "0.3" -tokio = { version = "1.22", features = ["net", "rt", "time"] } -byteorder = "1.2" -slog = "2.3.2" -async-trait = "0.1.58" -pin-project = "1.0.12" -once_cell = "1.17.0" +async-trait = "0.1.80" +byteorder = "1.5.0" +futures = "0.3.30" +once_cell = "1.19.0" +pin-project = "1.1.5" snafu = "0.8.2" -#slog = { version = "2.3.2", features = ['max_level_trace'] } +tokio = { version = "1.37.0", features = ["net", "rt", "time"] } +tracing = "0.1.40" [dev-dependencies] -tokio = { version = "1.22", features = ["macros"] } -slog-async = "2.3.0" -slog-term = "2.4.0" +tokio = { version = "1.37.0", features = ["macros"] } +tracing-subscriber = "0.3.18" [features] diff --git a/src/lib.rs b/src/lib.rs index 4d1a8c7..e8abf4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -193,11 +193,11 @@ #![deny(missing_copy_implementations)] use futures::{channel::oneshot, Stream}; -use slog::{debug, o, trace}; -use snafu::{whatever as bail, ResultExt as _, Whatever}; +use snafu::{whatever as bail, ResultExt, Whatever}; use std::borrow::Cow; use std::net::SocketAddr; use std::time; +use tracing::{debug, instrument, trace}; /// Per-operation ZooKeeper error types. pub mod error; @@ -244,24 +244,18 @@ pub(crate) use format_err; pub struct ZooKeeper { #[allow(dead_code)] connection: proto::Enqueuer, - logger: slog::Logger, } /// Builder that allows customizing options for ZooKeeper connections. -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone)] pub struct ZooKeeperBuilder { session_timeout: time::Duration, - logger: slog::Logger, } impl Default for ZooKeeperBuilder { fn default() -> Self { - let drain = slog::Discard; - let root = slog::Logger::root(drain, o!()); - ZooKeeperBuilder { session_timeout: time::Duration::new(0, 0), - logger: root, } } } @@ -294,14 +288,6 @@ impl ZooKeeperBuilder { self.session_timeout = t; } - /// Set the logger that should be used internally in the ZooKeeper client. - /// - /// By default, all logging is disabled. See also [the `slog` - /// documentation](https://docs.rs/slog). - pub fn set_logger(&mut self, l: slog::Logger) { - self.logger = l; - } - async fn handshake( self, addr: SocketAddr, @@ -317,15 +303,13 @@ impl ZooKeeperBuilder { passwd: vec![], read_only: false, }; - debug!(self.logger, "about to perform handshake"); + debug!("about to perform handshake"); - let plog = self.logger.clone(); - let enqueuer = proto::Packetizer::new(addr, stream, plog, default_watcher); + let enqueuer = proto::Packetizer::new(addr, stream, default_watcher); enqueuer.enqueue(request).await.map(move |response| { - trace!(self.logger, "{:?}", response); + trace!(?response, "Got response"); ZooKeeper { connection: enqueuer, - logger: self.logger, } }) } @@ -370,6 +354,7 @@ impl ZooKeeper { /// calls. /// /// The maximum allowable size of the data array is 1 MB (1,048,576 bytes). + #[instrument(skip(data, acl))] pub async fn create( &self, path: &str, @@ -382,7 +367,7 @@ impl ZooKeeper { A: Into>, { let data = data.into(); - trace!(self.logger, "create"; "path" => path, "mode" => ?mode, "dlen" => data.len()); + tracing::Span::current().record("dlen", data.len()); self.connection .enqueue(proto::Request::Create { path: path.to_string(), @@ -404,6 +389,7 @@ impl ZooKeeper { /// left by `get_data` calls. /// /// The maximum allowable size of the data array is 1 MB (1,048,576 bytes). + #[instrument(skip(data))] pub async fn set_data( &self, path: &str, @@ -414,7 +400,7 @@ impl ZooKeeper { D: Into>, { let data = data.into(); - trace!(self.logger, "set_data"; "path" => path, "version" => ?version, "dlen" => data.len()); + tracing::Span::current().record("dlen", data.len()); let version = version.unwrap_or(-1); self.connection .enqueue(proto::Request::SetData { @@ -434,12 +420,12 @@ impl ZooKeeper { /// This operation, if successful, will trigger all the watches on the node of the given `path` /// left by `exists` API calls, and the watches on the parent node left by `get_children` API /// calls. + #[instrument] pub async fn delete( &self, path: &str, version: Option, ) -> Result, Whatever> { - trace!(self.logger, "delete"; "path" => path, "version" => ?version); let version = version.unwrap_or(-1); self.connection .enqueue(proto::Request::Delete { @@ -455,11 +441,11 @@ impl ZooKeeper { /// /// If no node exists for the given path, the returned future resolves with an error of /// [`error::GetAcl::NoNode`]. + #[instrument] pub async fn get_acl( &self, path: &str, ) -> Result, Stat), error::GetAcl>, Whatever> { - trace!(self.logger, "get_acl"; "path" => path); self.connection .enqueue(proto::Request::GetAcl { path: path.to_string(), @@ -477,6 +463,7 @@ impl ZooKeeper { /// If no node exists for the given path, the returned future resolves with an error of /// [`error::SetAcl::NoNode`]. If the given `version` does not match the ACL version, the /// returned future resolves with an error of [`error::SetAcl::BadVersion`]. + #[instrument(skip(acl))] pub async fn set_acl( &self, path: &str, @@ -486,7 +473,6 @@ impl ZooKeeper { where A: Into>, { - trace!(self.logger, "set_acl"; "path" => path, "version" => ?version); let version = version.unwrap_or(-1); self.connection .enqueue(proto::Request::SetAcl { @@ -511,8 +497,8 @@ impl ZooKeeper { WithWatcher(self) } + #[instrument(name = "exists")] async fn exists_w(&self, path: &str, watch: Watch) -> Result, Whatever> { - trace!(self.logger, "exists"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::Exists { path: path.to_string(), @@ -527,12 +513,12 @@ impl ZooKeeper { self.exists_w(path, Watch::None).await } + #[instrument] async fn get_children_w( &self, path: &str, watch: Watch, ) -> Result>, Whatever> { - trace!(self.logger, "get_children"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::GetChildren { path: path.to_string(), @@ -551,12 +537,12 @@ impl ZooKeeper { self.get_children_w(path, Watch::None).await } + #[instrument] async fn get_data_w( &self, path: &str, watch: Watch, ) -> Result, Stat)>, Whatever> { - trace!(self.logger, "get_data"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::GetData { path: path.to_string(), @@ -774,16 +760,18 @@ mod tests { use super::*; use futures::StreamExt; - use slog::Drain; - use snafu::Whatever; + use tracing::Level; + + fn init_tracing_subscriber() { + let _ = tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .try_init(); + } #[tokio::test] async fn it_works() { - let mut builder = ZooKeeperBuilder::default(); - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - builder.set_logger(slog::Logger::root(drain, o!())); + init_tracing_subscriber(); + let builder = ZooKeeperBuilder::default(); let (zk, w) = builder .connect(&"127.0.0.1:2181".parse().unwrap()) @@ -977,11 +965,8 @@ mod tests { #[tokio::test] async fn acl_test() { - let mut builder = ZooKeeperBuilder::default(); - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - builder.set_logger(slog::Logger::root(drain, o!())); + init_tracing_subscriber(); + let builder = ZooKeeperBuilder::default(); let (zk, _) = (builder.connect(&"127.0.0.1:2181".parse().unwrap())) .await @@ -1034,11 +1019,8 @@ mod tests { #[tokio::test] async fn multi_test() { - let mut builder = ZooKeeperBuilder::default(); - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - builder.set_logger(slog::Logger::root(drain, o!())); + init_tracing_subscriber(); + let builder = ZooKeeperBuilder::default(); async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result, Whatever> { let mut res = Vec::new(); diff --git a/src/proto/active_packetizer.rs b/src/proto/active_packetizer.rs index dde2891..63b39e0 100644 --- a/src/proto/active_packetizer.rs +++ b/src/proto/active_packetizer.rs @@ -6,7 +6,6 @@ use futures::{ ready, }; use pin_project::pin_project; -use slog::{debug, info, trace}; use snafu::{Snafu, Whatever}; use std::collections::HashMap; use std::{ @@ -17,6 +16,7 @@ use std::{ time, }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tracing::{debug, info, instrument, trace}; #[derive(Debug, Snafu)] pub enum Error { @@ -124,6 +124,7 @@ where self.inbox.len() - self.instart } + #[allow(clippy::type_complexity)] fn enqueue_impl( outbox: &mut Vec, reply: &mut HashMap, @@ -181,11 +182,11 @@ where Self::enqueue_impl(&mut self.outbox, &mut self.reply, xid, item, tx) } + #[instrument(skip(self, cx))] fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context, exiting: bool, - logger: &mut slog::Logger, ) -> Poll> where S: AsyncWrite, @@ -208,7 +209,7 @@ where let mut this = self.project(); if wrote { // heartbeat is since last write traffic! - trace!(logger, "resetting heartbeat timer"); + trace!("resetting heartbeat timer"); this.timer .reset(tokio::time::Instant::now() + *this.timeout); } @@ -216,18 +217,18 @@ where ready!(this.stream.as_mut().poll_flush(cx)?); if exiting { - debug!(logger, "shutting down writer"); + debug!("shutting down writer"); ready!(this.stream.poll_shutdown(cx)?); } Poll::Ready(Ok(())) } + #[instrument(skip(self, cx, default_watcher))] fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context, default_watcher: &mut mpsc::UnboundedSender, - logger: &mut slog::Logger, ) -> Poll> where S: AsyncRead, @@ -239,7 +240,7 @@ where } else { 4 }; - trace!(logger, "need {} bytes, have {}", need, self.inlen()); + trace!("need {need} bytes, have {inlen}", inlen = self.inlen()); while self.inlen() < need { let this = self.as_mut().project(); @@ -260,7 +261,7 @@ where ); } else { // Server closed session with no bytes left in buffer - debug!(logger, "server closed connection"); + debug!("server closed connection"); return Poll::Ready(Ok(())); } } @@ -292,10 +293,8 @@ where let zxid = buf.read_i64::()?; if zxid > 0 { trace!( - logger, - "updated zxid from {} to {}", - *this.last_zxid_seen, - zxid + "updated zxid from {last_zxid_seen} to {zxid}", + last_zxid_seen = *this.last_zxid_seen ); assert!(zxid >= *this.last_zxid_seen); @@ -311,7 +310,7 @@ where if xid == 0 && !*this.first { // response to shutdown -- empty response // XXX: in theory, server should now shut down receive end - trace!(logger, "got response to CloseSession"); + trace!("got response to CloseSession"); if let Some(e) = err { return Poll::Ready(CloseSessionSnafu { error_code: e }.fail()); } @@ -319,15 +318,15 @@ where // watch event use super::response::ReadFrom; let e = WatchedEvent::read_from(&mut buf)?; - trace!(logger, "got watcher event {:?}", e); + trace!(event = ?e, "got watcher event"); let mut remove = false; if let Some(watchers) = this.watchers.get_mut(&e.path) { // custom watchers were set by the user -- notify them let mut i = (watchers.len() - 1) as isize; - trace!(logger, - "found potentially waiting custom watchers"; - "n" => watchers.len() + trace!( + watcher_count = watchers.len(), + "found potentially waiting custom watchers" ); while i >= 0 { @@ -366,7 +365,7 @@ where let _ = default_watcher.unbounded_send(e); } else if xid == -2 { // response to ping -- empty response - trace!(logger, "got response to heartbeat"); + trace!("got response to heartbeat"); if let Some(e) = err { return Poll::Ready(BadPingSnafu { error_code: e }.fail()); } @@ -383,41 +382,41 @@ where if err.is_none() || (opcode == request::OpCode::Exists && err == Some(ZkError::NoNode)) { - trace!(logger, "pending watcher turned into real watcher"; "xid" => xid); + trace!(xid, "pending watcher turned into real watcher"); this.watchers.entry(w.0).or_default().push((w.1, w.2)); } else { - trace!(logger, - "pending watcher not turned into real watcher: {:?}", - err; - "xid" => xid + trace!( + xid, + error = ?err, + "pending watcher not turned into real watcher" ); } } if let Some(e) = err { - info!(logger, - "handling server error response: {:?}", e; - "xid" => xid, "opcode" => ?opcode); + info!(xid, ?opcode, error = ?e, "handling server error response"); let _ = tx.send(Err(e)); } else { - let mut r = Response::parse(opcode, &mut buf)?; + let mut response = Response::parse(opcode, &mut buf)?; - debug!(logger, - "handling server response: {:?}", r; - "xid" => xid, "opcode" => ?opcode); + debug!(?response, xid, ?opcode, "handling server response"); if let Response::Connect { timeout, session_id, ref mut password, .. - } = r + } = response { assert!(timeout >= 0); - trace!(logger, "negotiated session timeout: {}ms", timeout); *this.timeout = time::Duration::from_millis(2 * timeout as u64 / 3); + trace!( + timeout = ?this.timeout, + "negotiated session timeout", + ); + this.timer .as_mut() .reset(tokio::time::Instant::now() + *this.timeout); @@ -427,7 +426,7 @@ where mem::swap(this.password, password); } - let _ = tx.send(Ok(r)); // if receiver doesn't care, we don't either + let _ = tx.send(Ok(response)); // if receiver doesn't care, we don't either } } } @@ -440,15 +439,14 @@ where } } + #[instrument(name = "poll_read", skip(self, cx, default_watcher))] pub(super) fn poll( mut self: Pin<&mut Self>, cx: &mut Context, exiting: bool, - logger: &mut slog::Logger, default_watcher: &mut mpsc::UnboundedSender, ) -> Poll> { - trace!(logger, "poll_read"); - let r = self.as_mut().poll_read(cx, default_watcher, logger)?; + let r = self.as_mut().poll_read(cx, default_watcher)?; let mut this = self.as_mut().project(); if let Poll::Ready(()) = this.timer.as_mut().poll(cx) { @@ -466,7 +464,7 @@ where this.outbox .write_i32::(request::OpCode::Ping as i32) .expect("Vec::write should never fail"); - trace!(logger, "sending heartbeat"); + trace!("sending heartbeat"); } else { // already request in flight, so no need to also send heartbeat } @@ -476,12 +474,11 @@ where .reset(tokio::time::Instant::now() + *this.timeout); } - trace!(logger, "poll_read"); - let w = self.poll_write(cx, exiting, logger)?; + let w = self.poll_write(cx, exiting)?; match (r, w) { (Poll::Ready(()), Poll::Ready(())) if exiting => { - debug!(logger, "packetizer done"); + debug!("packetizer done"); Poll::Ready(Ok(())) } (Poll::Ready(()), Poll::Ready(())) => Poll::Ready(ServerClosedConnectionSnafu.fail()), diff --git a/src/proto/packetizer.rs b/src/proto/packetizer.rs index 47f85cb..0f66ab6 100644 --- a/src/proto/packetizer.rs +++ b/src/proto/packetizer.rs @@ -10,7 +10,6 @@ use futures::{ ready, FutureExt, StreamExt, TryFutureExt, }; use pin_project::pin_project; -use slog::{debug, error, info, trace}; use snafu::{ResultExt, Whatever}; use std::{ future::{self, Future}, @@ -19,6 +18,7 @@ use std::{ task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{debug, error, info, instrument, trace}; #[pin_project] pub(crate) struct Packetizer @@ -41,8 +41,6 @@ where /// Next xid to issue xid: i32, - logger: slog::Logger, - exiting: bool, } @@ -55,7 +53,6 @@ where pub(crate) fn new( addr: S::Addr, stream: S, - log: slog::Logger, default_watcher: mpsc::UnboundedSender, ) -> Enqueuer where @@ -63,7 +60,6 @@ where { let (tx, rx) = mpsc::unbounded(); - let exitlogger = log.clone(); tokio::spawn( Packetizer { addr, @@ -71,12 +67,11 @@ where xid: 0, default_watcher, rx, - logger: log, exiting: false, } - .map_err(move |e| { - error!(exitlogger, "packetizer exiting: {:?}", e); - drop(e); + .map_err(move |error| { + error!(%error, "packetizer exiting"); + drop(error); }), ); @@ -100,14 +95,13 @@ where mut self: Pin<&mut Self>, cx: &mut Context, exiting: bool, - logger: &mut slog::Logger, default_watcher: &mut mpsc::UnboundedSender, ) -> Poll> { let ap = match self.as_mut().project() { PacketizerStateProj::Connected(ref mut ap) => { return ap .as_mut() - .poll(cx, exiting, logger, default_watcher) + .poll(cx, exiting, default_watcher) .map(|res| res.whatever_context("active packetizer failed")) } PacketizerStateProj::Reconnecting(ref mut c) => ready!(c.as_mut().poll(cx)?), @@ -115,7 +109,7 @@ where // we are now connected! self.set(PacketizerState::Connected(ap)); - self.poll(cx, exiting, logger, default_watcher) + self.poll(cx, exiting, default_watcher) } } @@ -123,6 +117,7 @@ impl Packetizer where S: ZooKeeperTransport, { + #[instrument(skip(self, cx))] fn poll_enqueue(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); while let PacketizerStateProj::Connected(ref mut ap) = this.state.as_mut().project() { @@ -130,7 +125,7 @@ where Some((request, response)) => (request, response), None => return Poll::Ready(Err(())), }; - debug!(this.logger, "enqueueing request {:?}", item; "xid" => *this.xid); + debug!(?item, xid = this.xid, "enqueueing request"); match item { Request::GetData { @@ -158,13 +153,7 @@ where Request::Exists { .. } => WatchType::Exist, _ => unreachable!(), }; - trace!( - this.logger, - "adding pending watcher"; - "xid" => *this.xid, - "path" => path, - "wtype" => ?wtype - ); + trace!(xid = this.xid, path, ?wtype, "adding pending watcher"); ap.as_mut() .pending_watchers() .insert(*this.xid, (path.to_string(), w, wtype)); @@ -189,10 +178,9 @@ where { type Output = Result<(), Whatever>; + #[instrument(skip(self, cx))] fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - trace!(self.logger, "packetizer polled"); if !self.exiting { - trace!(self.logger, "poll_enqueue"); match self.as_mut().poll_enqueue(cx) { Poll::Ready(Ok(())) | Poll::Pending => {} Poll::Ready(Err(())) => { @@ -228,7 +216,7 @@ where match this .state .as_mut() - .poll(cx, *this.exiting, this.logger, this.default_watcher) + .poll(cx, *this.exiting, this.default_watcher) { Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), Poll::Pending => Poll::Pending, @@ -246,7 +234,7 @@ where }; if *this.exiting { - debug!(this.logger, "connection lost during exit; not reconnecting"); + debug!("connection lost during exit; not reconnecting"); Poll::Ready(Ok(())) } else if let PacketizerState::Connected(ActivePacketizer { last_zxid_seen, @@ -254,15 +242,15 @@ where .. }) = *this.state { - info!(this.logger, "connection lost; reconnecting"; - "session_id" => session_id, - "last_zxid" => last_zxid_seen + info!( + session_id, + last_zxid = last_zxid_seen, + "connection lost; reconnecting" ); let xid = *this.xid; *this.xid += 1; - let log = this.logger.clone(); let retry = S::connect(this.addr.clone()) .map(|res| res.whatever_context("failed to connect")) .map_ok(move |stream| { @@ -274,11 +262,11 @@ where passwd: password, read_only: false, }; - trace!(log, "about to handshake (again)"); + trace!("about to handshake (again)"); let (tx, rx) = oneshot::channel(); tokio::spawn(rx.map(move |r| { - trace!(log, "re-connection response: {:?}", r); + trace!(response = ?r, "re-connection response"); })); let mut ap = ActivePacketizer::new(stream); diff --git a/src/proto/response.rs b/src/proto/response.rs index f4d44a4..cf3a761 100644 --- a/src/proto/response.rs +++ b/src/proto/response.rs @@ -7,11 +7,11 @@ use std::io::{self, Read}; #[derive(Debug)] pub(crate) enum Response { Connect { - protocol_version: i32, + _protocol_version: i32, timeout: i32, session_id: i64, password: Vec, - read_only: bool, + _read_only: bool, }, Stat(Stat), GetData { @@ -151,11 +151,11 @@ impl Response { pub(super) fn parse(opcode: OpCode, reader: &mut &[u8]) -> io::Result { match opcode { OpCode::CreateSession => Ok(Response::Connect { - protocol_version: reader.read_i32::()?, + _protocol_version: reader.read_i32::()?, timeout: reader.read_i32::()?, session_id: reader.read_i64::()?, password: reader.read_buffer()?, - read_only: reader.read_u8()? != 0, + _read_only: reader.read_u8()? != 0, }), OpCode::Exists | OpCode::SetData | OpCode::SetACL => { Ok(Response::Stat(Stat::read_from(reader)?))