From 9d72c8e8c24bc9cd6b32c4d8a3ab941f88d09fa1 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 11:08:10 +0200 Subject: [PATCH 01/13] chore: Bump dependencies --- Cargo.toml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8b5c1b7..ef7ad04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,18 +22,18 @@ maintenance = { status = "experimental" } [dependencies] futures = "0.3" -tokio = { version = "1.22", features = ["net", "rt", "time"] } +tokio = { version = "1.37", features = ["net", "rt", "time"] } failure = "0.1" -byteorder = "1.2" -slog = "2.3.2" -async-trait = "0.1.58" -pin-project = "1.0.12" -once_cell = "1.17.0" +byteorder = "1.5" +slog = "2.7" +async-trait = "0.1" +pin-project = "1.1" +once_cell = "1.19" #slog = { version = "2.3.2", features = ['max_level_trace'] } [dev-dependencies] -tokio = { version = "1.22", features = ["macros"] } -slog-async = "2.3.0" -slog-term = "2.4.0" +tokio = { version = "1.37", features = ["macros"] } +slog-async = "2.8" +slog-term = "2.9" [features] From 3c8243093c8226fd0bcd619881f5cef70865d9ed Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 11:09:50 +0200 Subject: [PATCH 02/13] fix clippy lints --- src/proto/active_packetizer.rs | 5 +---- src/proto/response.rs | 8 ++++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/proto/active_packetizer.rs b/src/proto/active_packetizer.rs index 7116f8b..5a57235 100644 --- a/src/proto/active_packetizer.rs +++ b/src/proto/active_packetizer.rs @@ -364,10 +364,7 @@ where || (opcode == request::OpCode::Exists && err == Some(ZkError::NoNode)) { trace!(logger, "pending watcher turned into real watcher"; "xid" => xid); - this.watchers - .entry(w.0) - .or_insert_with(Vec::new) - .push((w.1, w.2)); + this.watchers.entry(w.0).or_default().push((w.1, w.2)); } else { trace!(logger, "pending watcher not turned into real watcher: {:?}", diff --git a/src/proto/response.rs b/src/proto/response.rs index 82948ff..fbf2ae4 100644 --- a/src/proto/response.rs +++ b/src/proto/response.rs @@ -7,11 +7,11 @@ use std::io::{self, Read}; #[derive(Debug)] pub(crate) enum Response { Connect { - protocol_version: i32, + _protocol_version: i32, timeout: i32, session_id: i64, password: Vec, - read_only: bool, + _read_only: bool, }, Stat(Stat), GetData { @@ -151,11 +151,11 @@ impl Response { pub(super) fn parse(opcode: OpCode, reader: &mut &[u8]) -> Result { match opcode { OpCode::CreateSession => Ok(Response::Connect { - protocol_version: reader.read_i32::()?, + _protocol_version: reader.read_i32::()?, timeout: reader.read_i32::()?, session_id: reader.read_i64::()?, password: reader.read_buffer()?, - read_only: reader.read_u8()? != 0, + _read_only: reader.read_u8()? != 0, }), OpCode::Exists | OpCode::SetData | OpCode::SetACL => { Ok(Response::Stat(Stat::read_from(reader)?)) From 97bf5be45bfe662696d21a3245a96731d07b9a2a Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 12:55:34 +0200 Subject: [PATCH 03/13] Fix tests --- .github/workflows/build.yml | 2 +- CHANGELOG.md | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d55dfba..4eed64e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,7 +17,7 @@ jobs: strategy: matrix: rust: [stable, beta, nightly] - zookeeper: [3.8.1, 3.7.1, 3.6.4, 3.5.10] + zookeeper: [3.9.2, 3.8.4, 3.7.2] steps: - name: Check out repository code uses: actions/checkout@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index d19f00f..d5f6423 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +## Changed + +- Remove tests for Zookeeper versions `3.5.x` and `3.6.x` as they are EOL ([#40]). +- Add tests for version `3.9.2` ([#40]). + +[#40]: https://github.com/stackabletech/tokio-zookeeper/pull/40 + ## [0.2.1] - 2023-02-13 ### Changed @@ -28,7 +35,7 @@ now compatible with Rust's async/await syntax! [`WithTokio01Executor`](https://github.com/stackabletech/zookeeper-operator/blob/a682dcc3c7dc841917e968ba0e9fa9d33a4fabf5/rust/operator-binary/src/utils.rs#L6-L38)). 2. Upgrade tokio-zookeeper to v0.2. 3. Migrate async calls that thread the `ZooKeeper` instance to instead borrow it (for example, - `zk.exists(path).and_then(|(zk, stat)| /* do stuff */);` becomes + `zk.exists(path).and_then(|(zk, stat)| /* do stuff */);` becomes `let stat = zk.exists(path).await?;`). 4. Remove Tokio 0.1 and the compatibility wrapper if they are no longer required. From f97a66ce68f7e77446ff94cf10f3883d0ac8e668 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 12:56:52 +0200 Subject: [PATCH 04/13] Fix tests part 2 :) --- .github/workflows/build.yml | 4 ++-- CHANGELOG.md | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4eed64e..58ef015 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,12 +17,12 @@ jobs: strategy: matrix: rust: [stable, beta, nightly] - zookeeper: [3.9.2, 3.8.4, 3.7.2] + zookeeper: [3.9.2, 3.8.4, 3.7.2, 3.6.4, 3.5.10] steps: - name: Check out repository code uses: actions/checkout@v3 - run: rustup default ${{ matrix.rust }} - - run: wget -O zookeeper.tar.gz https://dlcdn.apache.org/zookeeper/zookeeper-${{ matrix.zookeeper }}/apache-zookeeper-${{ matrix.zookeeper }}-bin.tar.gz + - run: wget -O zookeeper.tar.gz https://archive.apache.org/dist/zookeeper/zookeeper-${{ matrix.zookeeper }}/apache-zookeeper-${{ matrix.zookeeper }}-bin.tar.gz - run: mkdir zookeeper - run: tar -zxvf zookeeper.tar.gz -C zookeeper --strip-components 1 - run: ./scripts/ci-start-zookeeper diff --git a/CHANGELOG.md b/CHANGELOG.md index d5f6423..30b74a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,6 @@ All notable changes to this project will be documented in this file. ## Changed -- Remove tests for Zookeeper versions `3.5.x` and `3.6.x` as they are EOL ([#40]). - Add tests for version `3.9.2` ([#40]). [#40]: https://github.com/stackabletech/tokio-zookeeper/pull/40 From c578c1a4b9b214ef04812c45763ba8e9d057afc0 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 12:58:15 +0200 Subject: [PATCH 05/13] fix last clippy lint --- src/proto/active_packetizer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/proto/active_packetizer.rs b/src/proto/active_packetizer.rs index 5a57235..a52799d 100644 --- a/src/proto/active_packetizer.rs +++ b/src/proto/active_packetizer.rs @@ -101,6 +101,7 @@ where self.inbox.len() - self.instart } + #[allow(clippy::type_complexity)] fn enqueue_impl( outbox: &mut Vec, reply: &mut HashMap>)>, From 0818ceb612bcf8f44fb5dff86eb33752a6bb6f82 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 13:52:21 +0200 Subject: [PATCH 06/13] revert changes --- .github/workflows/build.yml | 2 +- CHANGELOG.md | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 58ef015..1ea836b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,7 +17,7 @@ jobs: strategy: matrix: rust: [stable, beta, nightly] - zookeeper: [3.9.2, 3.8.4, 3.7.2, 3.6.4, 3.5.10] + zookeeper: [3.8.1, 3.7.1, 3.6.4, 3.5.10] steps: - name: Check out repository code uses: actions/checkout@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 30b74a4..d8afdba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,6 @@ All notable changes to this project will be documented in this file. ## [Unreleased] -## Changed - -- Add tests for version `3.9.2` ([#40]). - -[#40]: https://github.com/stackabletech/tokio-zookeeper/pull/40 - ## [0.2.1] - 2023-02-13 ### Changed From 1d7c29618090432d3c471ce93f3984ab8c246076 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 13:52:51 +0200 Subject: [PATCH 07/13] revert link --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1ea836b..d55dfba 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,7 +22,7 @@ jobs: - name: Check out repository code uses: actions/checkout@v3 - run: rustup default ${{ matrix.rust }} - - run: wget -O zookeeper.tar.gz https://archive.apache.org/dist/zookeeper/zookeeper-${{ matrix.zookeeper }}/apache-zookeeper-${{ matrix.zookeeper }}-bin.tar.gz + - run: wget -O zookeeper.tar.gz https://dlcdn.apache.org/zookeeper/zookeeper-${{ matrix.zookeeper }}/apache-zookeeper-${{ matrix.zookeeper }}-bin.tar.gz - run: mkdir zookeeper - run: tar -zxvf zookeeper.tar.gz -C zookeeper --strip-components 1 - run: ./scripts/ci-start-zookeeper From 17fbe78cefd1bdfd97d3c4eeb4ac1090337b31d0 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 14:46:27 +0200 Subject: [PATCH 08/13] chore!: Migrated from slog to tracing --- CHANGELOG.md | 5 +++ Cargo.toml | 20 +++++----- src/lib.rs | 73 +++++++++++++--------------------- src/proto/active_packetizer.rs | 68 +++++++++++++++---------------- src/proto/packetizer.rs | 54 ++++++++++--------------- 5 files changed, 96 insertions(+), 124 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8afdba..6546ab2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +## Changed +- [BREAKING] Migrated from `slog` to `tracing` ([#40]). + +[#40]: https://github.com/stackabletech/tokio-zookeeper/pull/40 + ## [0.2.1] - 2023-02-13 ### Changed diff --git a/Cargo.toml b/Cargo.toml index ef7ad04..3f0c65a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,19 +21,17 @@ edition = "2021" maintenance = { status = "experimental" } [dependencies] -futures = "0.3" -tokio = { version = "1.37", features = ["net", "rt", "time"] } -failure = "0.1" -byteorder = "1.5" -slog = "2.7" -async-trait = "0.1" -pin-project = "1.1" -once_cell = "1.19" -#slog = { version = "2.3.2", features = ['max_level_trace'] } +futures = "0.3.30" +tokio = { version = "1.37.0", features = ["net", "rt", "time"] } +failure = "0.1.8" +byteorder = "1.5.0" +async-trait = "0.1.80" +pin-project = "1.1.5" +once_cell = "1.19.0" +tracing = "0.1.40" [dev-dependencies] tokio = { version = "1.37", features = ["macros"] } -slog-async = "2.8" -slog-term = "2.9" +tracing-subscriber = "0.3.18" [features] diff --git a/src/lib.rs b/src/lib.rs index 84d7080..f282bd8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -194,10 +194,10 @@ use failure::{bail, format_err}; use futures::{channel::oneshot, Stream, TryFutureExt}; -use slog::{debug, o, trace}; use std::borrow::Cow; use std::net::SocketAddr; use std::time; +use tracing::{debug, instrument, trace}; /// Per-operation ZooKeeper error types. pub mod error; @@ -237,24 +237,18 @@ pub use crate::types::{ pub struct ZooKeeper { #[allow(dead_code)] connection: proto::Enqueuer, - logger: slog::Logger, } /// Builder that allows customizing options for ZooKeeper connections. -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone)] pub struct ZooKeeperBuilder { session_timeout: time::Duration, - logger: slog::Logger, } impl Default for ZooKeeperBuilder { fn default() -> Self { - let drain = slog::Discard; - let root = slog::Logger::root(drain, o!()); - ZooKeeperBuilder { session_timeout: time::Duration::new(0, 0), - logger: root, } } } @@ -288,14 +282,6 @@ impl ZooKeeperBuilder { self.session_timeout = t; } - /// Set the logger that should be used internally in the ZooKeeper client. - /// - /// By default, all logging is disabled. See also [the `slog` - /// documentation](https://docs.rs/slog). - pub fn set_logger(&mut self, l: slog::Logger) { - self.logger = l; - } - async fn handshake( self, addr: SocketAddr, @@ -311,15 +297,13 @@ impl ZooKeeperBuilder { passwd: vec![], read_only: false, }; - debug!(self.logger, "about to perform handshake"); + debug!("about to perform handshake"); - let plog = self.logger.clone(); - let enqueuer = proto::Packetizer::new(addr, stream, plog, default_watcher); + let enqueuer = proto::Packetizer::new(addr, stream, default_watcher); enqueuer.enqueue(request).await.map(move |response| { - trace!(self.logger, "{:?}", response); + trace!(?response, "Got response"); ZooKeeper { connection: enqueuer, - logger: self.logger, } }) } @@ -364,6 +348,7 @@ impl ZooKeeper { /// calls. /// /// The maximum allowable size of the data array is 1 MB (1,048,576 bytes). + #[instrument(skip(data, acl))] pub async fn create( &self, path: &str, @@ -376,7 +361,7 @@ impl ZooKeeper { A: Into>, { let data = data.into(); - trace!(self.logger, "create"; "path" => path, "mode" => ?mode, "dlen" => data.len()); + tracing::Span::current().record("dlen", data.len()); self.connection .enqueue(proto::Request::Create { path: path.to_string(), @@ -398,6 +383,7 @@ impl ZooKeeper { /// left by `get_data` calls. /// /// The maximum allowable size of the data array is 1 MB (1,048,576 bytes). + #[instrument(skip(data))] pub async fn set_data( &self, path: &str, @@ -408,7 +394,7 @@ impl ZooKeeper { D: Into>, { let data = data.into(); - trace!(self.logger, "set_data"; "path" => path, "version" => ?version, "dlen" => data.len()); + tracing::Span::current().record("dlen", data.len()); let version = version.unwrap_or(-1); self.connection .enqueue(proto::Request::SetData { @@ -428,12 +414,12 @@ impl ZooKeeper { /// This operation, if successful, will trigger all the watches on the node of the given `path` /// left by `exists` API calls, and the watches on the parent node left by `get_children` API /// calls. + #[instrument] pub async fn delete( &self, path: &str, version: Option, ) -> Result, failure::Error> { - trace!(self.logger, "delete"; "path" => path, "version" => ?version); let version = version.unwrap_or(-1); self.connection .enqueue(proto::Request::Delete { @@ -449,11 +435,11 @@ impl ZooKeeper { /// /// If no node exists for the given path, the returned future resolves with an error of /// [`error::GetAcl::NoNode`]. + #[instrument] pub async fn get_acl( &self, path: &str, ) -> Result, Stat), error::GetAcl>, failure::Error> { - trace!(self.logger, "get_acl"; "path" => path); self.connection .enqueue(proto::Request::GetAcl { path: path.to_string(), @@ -471,6 +457,7 @@ impl ZooKeeper { /// If no node exists for the given path, the returned future resolves with an error of /// [`error::SetAcl::NoNode`]. If the given `version` does not match the ACL version, the /// returned future resolves with an error of [`error::SetAcl::BadVersion`]. + #[instrument(skip(acl))] pub async fn set_acl( &self, path: &str, @@ -480,7 +467,6 @@ impl ZooKeeper { where A: Into>, { - trace!(self.logger, "set_acl"; "path" => path, "version" => ?version); let version = version.unwrap_or(-1); self.connection .enqueue(proto::Request::SetAcl { @@ -505,8 +491,8 @@ impl ZooKeeper { WithWatcher(self) } + #[instrument] async fn exists_w(&self, path: &str, watch: Watch) -> Result, failure::Error> { - trace!(self.logger, "exists"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::Exists { path: path.to_string(), @@ -521,12 +507,12 @@ impl ZooKeeper { self.exists_w(path, Watch::None).await } + #[instrument] async fn get_children_w( &self, path: &str, watch: Watch, ) -> Result>, failure::Error> { - trace!(self.logger, "get_children"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::GetChildren { path: path.to_string(), @@ -545,12 +531,12 @@ impl ZooKeeper { self.get_children_w(path, Watch::None).await } + #[instrument] async fn get_data_w( &self, path: &str, watch: Watch, ) -> Result, Stat)>, failure::Error> { - trace!(self.logger, "get_data"; "path" => path, "watch" => ?watch); self.connection .enqueue(proto::Request::GetData { path: path.to_string(), @@ -768,15 +754,18 @@ mod tests { use super::*; use futures::StreamExt; - use slog::Drain; + use tracing::Level; + + fn init() { + let _ = tracing_subscriber::fmt() + .with_max_level(Level::DEBUG) + .try_init(); + } #[tokio::test] async fn it_works() { - let mut builder = ZooKeeperBuilder::default(); - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - builder.set_logger(slog::Logger::root(drain, o!())); + init(); + let builder = ZooKeeperBuilder::default(); let (zk, w) = builder .connect(&"127.0.0.1:2181".parse().unwrap()) @@ -973,11 +962,8 @@ mod tests { #[tokio::test] async fn acl_test() { - let mut builder = ZooKeeperBuilder::default(); - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - builder.set_logger(slog::Logger::root(drain, o!())); + init(); + let builder = ZooKeeperBuilder::default(); let (zk, _) = (builder.connect(&"127.0.0.1:2181".parse().unwrap())) .await @@ -1030,11 +1016,8 @@ mod tests { #[tokio::test] async fn multi_test() { - let mut builder = ZooKeeperBuilder::default(); - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - builder.set_logger(slog::Logger::root(drain, o!())); + init(); + let builder = ZooKeeperBuilder::default(); async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result, failure::Error> { let mut res = Vec::new(); diff --git a/src/proto/active_packetizer.rs b/src/proto/active_packetizer.rs index a52799d..1f1c170 100644 --- a/src/proto/active_packetizer.rs +++ b/src/proto/active_packetizer.rs @@ -7,7 +7,6 @@ use futures::{ ready, }; use pin_project::pin_project; -use slog::{debug, info, trace}; use std::collections::HashMap; use std::{ future::Future, @@ -17,6 +16,7 @@ use std::{ time, }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tracing::{debug, info, instrument, trace}; #[pin_project] pub(super) struct ActivePacketizer { @@ -159,11 +159,11 @@ where Self::enqueue_impl(&mut self.outbox, &mut self.reply, xid, item, tx) } + #[instrument(skip(self, cx))] fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context, exiting: bool, - logger: &mut slog::Logger, ) -> Poll> where S: AsyncWrite, @@ -186,7 +186,7 @@ where let mut this = self.project(); if wrote { // heartbeat is since last write traffic! - trace!(logger, "resetting heartbeat timer"); + trace!("resetting heartbeat timer"); this.timer .reset(tokio::time::Instant::now() + *this.timeout); } @@ -198,18 +198,18 @@ where .map_err(failure::Error::from)?); if exiting { - debug!(logger, "shutting down writer"); + debug!("shutting down writer"); ready!(this.stream.poll_shutdown(cx)?); } Poll::Ready(Ok(())) } + #[instrument(skip(self, cx, default_watcher))] fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context, default_watcher: &mut mpsc::UnboundedSender, - logger: &mut slog::Logger, ) -> Poll> where S: AsyncRead, @@ -221,7 +221,7 @@ where } else { 4 }; - trace!(logger, "need {} bytes, have {}", need, self.inlen()); + trace!("need {need} bytes, have {inlen}", inlen = self.inlen()); while self.inlen() < need { let this = self.as_mut().project(); @@ -241,7 +241,7 @@ where ))); } else { // Server closed session with no bytes left in buffer - debug!(logger, "server closed connection"); + debug!("server closed connection"); return Poll::Ready(Ok(())); } } @@ -273,10 +273,8 @@ where let zxid = buf.read_i64::()?; if zxid > 0 { trace!( - logger, - "updated zxid from {} to {}", - *this.last_zxid_seen, - zxid + "updated zxid from {last_zxid_seen} to {zxid}", + last_zxid_seen = *this.last_zxid_seen ); assert!(zxid >= *this.last_zxid_seen); @@ -292,7 +290,7 @@ where if xid == 0 && !*this.first { // response to shutdown -- empty response // XXX: in theory, server should now shut down receive end - trace!(logger, "got response to CloseSession"); + trace!("got response to CloseSession"); if let Some(e) = err { return Poll::Ready(Err(format_err!("failed to close session: {:?}", e))); } @@ -300,15 +298,15 @@ where // watch event use super::response::ReadFrom; let e = WatchedEvent::read_from(&mut buf)?; - trace!(logger, "got watcher event {:?}", e); + trace!(event = ?e, "got watcher event"); let mut remove = false; if let Some(watchers) = this.watchers.get_mut(&e.path) { // custom watchers were set by the user -- notify them let mut i = (watchers.len() - 1) as isize; - trace!(logger, - "found potentially waiting custom watchers"; - "n" => watchers.len() + trace!( + watchers = watchers.len(), + "found potentially waiting custom watchers" ); while i >= 0 { @@ -347,7 +345,7 @@ where let _ = default_watcher.unbounded_send(e); } else if xid == -2 { // response to ping -- empty response - trace!(logger, "got response to heartbeat"); + trace!("got response to heartbeat"); if let Some(e) = err { return Poll::Ready(Err(format_err!("bad response to ping: {:?}", e))); } @@ -364,29 +362,25 @@ where if err.is_none() || (opcode == request::OpCode::Exists && err == Some(ZkError::NoNode)) { - trace!(logger, "pending watcher turned into real watcher"; "xid" => xid); + trace!(xid, "pending watcher turned into real watcher"); this.watchers.entry(w.0).or_default().push((w.1, w.2)); } else { - trace!(logger, - "pending watcher not turned into real watcher: {:?}", - err; - "xid" => xid + trace!( + xid, + error = ?err, + "pending watcher not turned into real watcher" ); } } if let Some(e) = err { - info!(logger, - "handling server error response: {:?}", e; - "xid" => xid, "opcode" => ?opcode); + info!(xid, ?opcode, error = ?e, "handling server error response"); let _ = tx.send(Err(e)); } else { let mut r = Response::parse(opcode, &mut buf)?; - debug!(logger, - "handling server response: {:?}", r; - "xid" => xid, "opcode" => ?opcode); + debug!(xid, ?opcode, "handling server response"); if let Response::Connect { timeout, @@ -396,9 +390,13 @@ where } = r { assert!(timeout >= 0); - trace!(logger, "negotiated session timeout: {}ms", timeout); *this.timeout = time::Duration::from_millis(2 * timeout as u64 / 3); + trace!( + timeout = ?this.timeout, + "negotiated session timeout", + ); + this.timer .as_mut() .reset(tokio::time::Instant::now() + *this.timeout); @@ -421,15 +419,14 @@ where } } + #[instrument(skip(self, cx, default_watcher))] pub(super) fn poll( mut self: Pin<&mut Self>, cx: &mut Context, exiting: bool, - logger: &mut slog::Logger, default_watcher: &mut mpsc::UnboundedSender, ) -> Poll> { - trace!(logger, "poll_read"); - let r = self.as_mut().poll_read(cx, default_watcher, logger)?; + let r = self.as_mut().poll_read(cx, default_watcher)?; let mut this = self.as_mut().project(); if let Poll::Ready(()) = this.timer.as_mut().poll(cx) { @@ -447,7 +444,7 @@ where this.outbox .write_i32::(request::OpCode::Ping as i32) .expect("Vec::write should never fail"); - trace!(logger, "sending heartbeat"); + trace!("sending heartbeat"); } else { // already request in flight, so no need to also send heartbeat } @@ -457,12 +454,11 @@ where .reset(tokio::time::Instant::now() + *this.timeout); } - trace!(logger, "poll_read"); - let w = self.poll_write(cx, exiting, logger)?; + let w = self.poll_write(cx, exiting)?; match (r, w) { (Poll::Ready(()), Poll::Ready(())) if exiting => { - debug!(logger, "packetizer done"); + debug!("packetizer done"); Poll::Ready(Ok(())) } (Poll::Ready(()), Poll::Ready(())) => Poll::Ready(Err(format_err!( diff --git a/src/proto/packetizer.rs b/src/proto/packetizer.rs index f9349dc..2dbd1a4 100644 --- a/src/proto/packetizer.rs +++ b/src/proto/packetizer.rs @@ -11,7 +11,6 @@ use futures::{ ready, FutureExt, StreamExt, TryFutureExt, }; use pin_project::pin_project; -use slog::{debug, error, info, trace}; use std::{ future::{self, Future}, mem, @@ -19,6 +18,7 @@ use std::{ task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{debug, error, info, instrument, trace}; #[pin_project] pub(crate) struct Packetizer @@ -41,8 +41,6 @@ where /// Next xid to issue xid: i32, - logger: slog::Logger, - exiting: bool, } @@ -55,7 +53,6 @@ where pub(crate) fn new( addr: S::Addr, stream: S, - log: slog::Logger, default_watcher: mpsc::UnboundedSender, ) -> Enqueuer where @@ -63,7 +60,6 @@ where { let (tx, rx) = mpsc::unbounded(); - let exitlogger = log.clone(); tokio::spawn( Packetizer { addr, @@ -71,12 +67,11 @@ where xid: 0, default_watcher, rx, - logger: log, exiting: false, } - .map_err(move |e| { - error!(exitlogger, "packetizer exiting: {:?}", e); - drop(e); + .map_err(move |error| { + error!(%error, "packetizer exiting"); + drop(error); }), ); @@ -100,19 +95,18 @@ where mut self: Pin<&mut Self>, cx: &mut Context, exiting: bool, - logger: &mut slog::Logger, default_watcher: &mut mpsc::UnboundedSender, ) -> Poll> { let ap = match self.as_mut().project() { PacketizerStateProj::Connected(ref mut ap) => { - return ap.as_mut().poll(cx, exiting, logger, default_watcher) + return ap.as_mut().poll(cx, exiting, default_watcher) } PacketizerStateProj::Reconnecting(ref mut c) => ready!(c.as_mut().poll(cx)?), }; // we are now connected! self.set(PacketizerState::Connected(ap)); - self.poll(cx, exiting, logger, default_watcher) + self.poll(cx, exiting, default_watcher) } } @@ -120,6 +114,7 @@ impl Packetizer where S: ZooKeeperTransport, { + #[instrument(skip(self, cx))] fn poll_enqueue(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); while let PacketizerStateProj::Connected(ref mut ap) = this.state.as_mut().project() { @@ -127,7 +122,7 @@ where Some((request, response)) => (request, response), None => return Poll::Ready(Err(())), }; - debug!(this.logger, "enqueueing request {:?}", item; "xid" => *this.xid); + debug!(?item, xid = this.xid, "enqueueing request"); match item { Request::GetData { @@ -155,13 +150,7 @@ where Request::Exists { .. } => WatchType::Exist, _ => unreachable!(), }; - trace!( - this.logger, - "adding pending watcher"; - "xid" => *this.xid, - "path" => path, - "wtype" => ?wtype - ); + trace!(xid = this.xid, path, ?wtype, "adding pending watcher"); ap.as_mut() .pending_watchers() .insert(*this.xid, (path.to_string(), w, wtype)); @@ -186,10 +175,9 @@ where { type Output = Result<(), failure::Error>; + #[instrument(skip(self, cx))] fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - trace!(self.logger, "packetizer polled"); if !self.exiting { - trace!(self.logger, "poll_enqueue"); match self.as_mut().poll_enqueue(cx) { Poll::Ready(Ok(())) | Poll::Pending => {} Poll::Ready(Err(())) => { @@ -225,7 +213,7 @@ where match this .state .as_mut() - .poll(cx, *this.exiting, this.logger, this.default_watcher) + .poll(cx, *this.exiting, this.default_watcher) { Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), Poll::Pending => Poll::Pending, @@ -243,7 +231,7 @@ where }; if *this.exiting { - debug!(this.logger, "connection lost during exit; not reconnecting"); + debug!("connection lost during exit; not reconnecting"); Poll::Ready(Ok(())) } else if let PacketizerState::Connected(ActivePacketizer { last_zxid_seen, @@ -251,15 +239,15 @@ where .. }) = *this.state { - info!(this.logger, "connection lost; reconnecting"; - "session_id" => session_id, - "last_zxid" => last_zxid_seen + info!( + session_id, + last_zxid = last_zxid_seen, + "connection lost; reconnecting" ); let xid = *this.xid; *this.xid += 1; - let log = this.logger.clone(); let retry = S::connect(this.addr.clone()) .map_err(|e| e.into()) @@ -272,12 +260,14 @@ where passwd: password, read_only: false, }; - trace!(log, "about to handshake (again)"); + trace!("about to handshake (again)"); let (tx, rx) = oneshot::channel(); - tokio::spawn(rx.map(move |r| { - trace!(log, "re-connection response: {:?}", r); - })); + tokio::spawn( + rx.map( + move |r| trace!(response = ?r, "re-connection response"), + ), + ); let mut ap = ActivePacketizer::new(stream); ap.enqueue_unpin(xid, request, tx); From 3d28c9d22f343f7fd3b040e1d7dba246e0294b1a Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 14:50:22 +0200 Subject: [PATCH 09/13] fix tokio version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3f0c65a..b6be6da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ once_cell = "1.19.0" tracing = "0.1.40" [dev-dependencies] -tokio = { version = "1.37", features = ["macros"] } +tokio = { version = "1.37.0", features = ["macros"] } tracing-subscriber = "0.3.18" [features] From 98e3f0fa64eb3832c30342518825c52fe7b37e81 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 29 Apr 2024 14:54:27 +0200 Subject: [PATCH 10/13] fix missing newline --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6546ab2..beea99a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ All notable changes to this project will be documented in this file. ## [Unreleased] ## Changed + - [BREAKING] Migrated from `slog` to `tracing` ([#40]). [#40]: https://github.com/stackabletech/tokio-zookeeper/pull/40 From a1a0707f5f7cc697100a7a6fc75f85463f38104a Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 30 Apr 2024 10:07:08 +0200 Subject: [PATCH 11/13] Apply suggestions from code review Co-authored-by: Techassi --- src/lib.rs | 4 ++-- src/proto/active_packetizer.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 281d6f5..818078b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -497,7 +497,7 @@ impl ZooKeeper { WithWatcher(self) } - #[instrument] + #[instrument(name = "exists")] async fn exists_w(&self, path: &str, watch: Watch) -> Result, Whatever> { self.connection .enqueue(proto::Request::Exists { @@ -762,7 +762,7 @@ mod tests { use futures::StreamExt; use tracing::Level; - fn init() { + fn init_tracing_subscriber() { let _ = tracing_subscriber::fmt() .with_max_level(Level::DEBUG) .try_init(); diff --git a/src/proto/active_packetizer.rs b/src/proto/active_packetizer.rs index 9d999e1..e8815fe 100644 --- a/src/proto/active_packetizer.rs +++ b/src/proto/active_packetizer.rs @@ -325,7 +325,7 @@ where // custom watchers were set by the user -- notify them let mut i = (watchers.len() - 1) as isize; trace!( - watchers = watchers.len(), + watcher_count = watchers.len(), "found potentially waiting custom watchers" ); @@ -439,7 +439,7 @@ where } } - #[instrument(skip(self, cx, default_watcher))] + #[instrument(name = "poll_read", skip(self, cx, default_watcher))] pub(super) fn poll( mut self: Pin<&mut Self>, cx: &mut Context, From 3414d38bdea7aac241a18ddd4935c2b0e0cb528b Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 30 Apr 2024 10:10:12 +0200 Subject: [PATCH 12/13] fix compilation --- src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 818078b..e8abf4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -770,7 +770,7 @@ mod tests { #[tokio::test] async fn it_works() { - init(); + init_tracing_subscriber(); let builder = ZooKeeperBuilder::default(); let (zk, w) = builder @@ -965,7 +965,7 @@ mod tests { #[tokio::test] async fn acl_test() { - init(); + init_tracing_subscriber(); let builder = ZooKeeperBuilder::default(); let (zk, _) = (builder.connect(&"127.0.0.1:2181".parse().unwrap())) @@ -1019,7 +1019,7 @@ mod tests { #[tokio::test] async fn multi_test() { - init(); + init_tracing_subscriber(); let builder = ZooKeeperBuilder::default(); async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result, Whatever> { From 421ac3d2e504272a26330e9b94cb924d915cf57b Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 30 Apr 2024 10:24:40 +0200 Subject: [PATCH 13/13] Add response to trace --- src/proto/active_packetizer.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/proto/active_packetizer.rs b/src/proto/active_packetizer.rs index e8815fe..63b39e0 100644 --- a/src/proto/active_packetizer.rs +++ b/src/proto/active_packetizer.rs @@ -398,16 +398,16 @@ where let _ = tx.send(Err(e)); } else { - let mut r = Response::parse(opcode, &mut buf)?; + let mut response = Response::parse(opcode, &mut buf)?; - debug!(xid, ?opcode, "handling server response"); + debug!(?response, xid, ?opcode, "handling server response"); if let Response::Connect { timeout, session_id, ref mut password, .. - } = r + } = response { assert!(timeout >= 0); @@ -426,7 +426,7 @@ where mem::swap(this.password, password); } - let _ = tx.send(Ok(r)); // if receiver doesn't care, we don't either + let _ = tx.send(Ok(response)); // if receiver doesn't care, we don't either } } }