Skip to content
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ All notable changes to this project will be documented in this file.
### Changed

- [BREAKING] Migrated errors from Failure to SNAFU ([#39]).
- [BREAKING] Migrated from `slog` to `tracing` ([#40]).
- Updated ZooKeeper versions we test against (now 3.9.2, 3.8.4, 3.7.2, 3.6.4, 3.5.10) ([#39]).

[#39]: https://github.com/stackabletech/tokio-zookeeper/pull/39
[#40]: https://github.com/stackabletech/tokio-zookeeper/pull/40

## [0.2.1] - 2023-02-13

Expand All @@ -35,7 +37,7 @@ now compatible with Rust's async/await syntax!
[`WithTokio01Executor`](https://github.com/stackabletech/zookeeper-operator/blob/a682dcc3c7dc841917e968ba0e9fa9d33a4fabf5/rust/operator-binary/src/utils.rs#L6-L38)).
2. Upgrade tokio-zookeeper to v0.2.
3. Migrate async calls that thread the `ZooKeeper` instance to instead borrow it (for example,
`zk.exists(path).and_then(|(zk, stat)| /* do stuff */);` becomes
`zk.exists(path).and_then(|(zk, stat)| /* do stuff */);` becomes
`let stat = zk.exists(path).await?;`).
4. Remove Tokio 0.1 and the compatibility wrapper if they are no longer required.

Expand Down
20 changes: 9 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ edition = "2021"
maintenance = { status = "experimental" }

[dependencies]
futures = "0.3"
tokio = { version = "1.22", features = ["net", "rt", "time"] }
byteorder = "1.2"
slog = "2.3.2"
async-trait = "0.1.58"
pin-project = "1.0.12"
once_cell = "1.17.0"
async-trait = "0.1.80"
byteorder = "1.5.0"
futures = "0.3.30"
once_cell = "1.19.0"
pin-project = "1.1.5"
snafu = "0.8.2"
#slog = { version = "2.3.2", features = ['max_level_trace'] }
tokio = { version = "1.37.0", features = ["net", "rt", "time"] }
tracing = "0.1.40"

[dev-dependencies]
tokio = { version = "1.22", features = ["macros"] }
slog-async = "2.3.0"
slog-term = "2.4.0"
tokio = { version = "1.37.0", features = ["macros"] }
tracing-subscriber = "0.3.18"

[features]
76 changes: 29 additions & 47 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@
#![deny(missing_copy_implementations)]

use futures::{channel::oneshot, Stream};
use slog::{debug, o, trace};
use snafu::{whatever as bail, ResultExt as _, Whatever};
use snafu::{whatever as bail, ResultExt, Whatever};
use std::borrow::Cow;
use std::net::SocketAddr;
use std::time;
use tracing::{debug, instrument, trace};

/// Per-operation ZooKeeper error types.
pub mod error;
Expand Down Expand Up @@ -244,24 +244,18 @@ pub(crate) use format_err;
pub struct ZooKeeper {
#[allow(dead_code)]
connection: proto::Enqueuer,
logger: slog::Logger,
}

/// Builder that allows customizing options for ZooKeeper connections.
#[derive(Debug, Clone)]
#[derive(Debug, Copy, Clone)]
pub struct ZooKeeperBuilder {
session_timeout: time::Duration,
logger: slog::Logger,
}

impl Default for ZooKeeperBuilder {
fn default() -> Self {
let drain = slog::Discard;
let root = slog::Logger::root(drain, o!());

ZooKeeperBuilder {
session_timeout: time::Duration::new(0, 0),
logger: root,
}
}
}
Expand Down Expand Up @@ -294,14 +288,6 @@ impl ZooKeeperBuilder {
self.session_timeout = t;
}

/// Set the logger that should be used internally in the ZooKeeper client.
///
/// By default, all logging is disabled. See also [the `slog`
/// documentation](https://docs.rs/slog).
pub fn set_logger(&mut self, l: slog::Logger) {
self.logger = l;
}

async fn handshake(
self,
addr: SocketAddr,
Expand All @@ -317,15 +303,13 @@ impl ZooKeeperBuilder {
passwd: vec![],
read_only: false,
};
debug!(self.logger, "about to perform handshake");
debug!("about to perform handshake");

let plog = self.logger.clone();
let enqueuer = proto::Packetizer::new(addr, stream, plog, default_watcher);
let enqueuer = proto::Packetizer::new(addr, stream, default_watcher);
enqueuer.enqueue(request).await.map(move |response| {
trace!(self.logger, "{:?}", response);
trace!(?response, "Got response");
ZooKeeper {
connection: enqueuer,
logger: self.logger,
}
})
}
Expand Down Expand Up @@ -370,6 +354,7 @@ impl ZooKeeper {
/// calls.
///
/// The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
#[instrument(skip(data, acl))]
pub async fn create<D, A>(
&self,
path: &str,
Expand All @@ -382,7 +367,7 @@ impl ZooKeeper {
A: Into<Cow<'static, [Acl]>>,
{
let data = data.into();
trace!(self.logger, "create"; "path" => path, "mode" => ?mode, "dlen" => data.len());
tracing::Span::current().record("dlen", data.len());
self.connection
.enqueue(proto::Request::Create {
path: path.to_string(),
Expand All @@ -404,6 +389,7 @@ impl ZooKeeper {
/// left by `get_data` calls.
///
/// The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
#[instrument(skip(data))]
pub async fn set_data<D>(
&self,
path: &str,
Expand All @@ -414,7 +400,7 @@ impl ZooKeeper {
D: Into<Cow<'static, [u8]>>,
{
let data = data.into();
trace!(self.logger, "set_data"; "path" => path, "version" => ?version, "dlen" => data.len());
tracing::Span::current().record("dlen", data.len());
let version = version.unwrap_or(-1);
self.connection
.enqueue(proto::Request::SetData {
Expand All @@ -434,12 +420,12 @@ impl ZooKeeper {
/// This operation, if successful, will trigger all the watches on the node of the given `path`
/// left by `exists` API calls, and the watches on the parent node left by `get_children` API
/// calls.
#[instrument]
pub async fn delete(
&self,
path: &str,
version: Option<i32>,
) -> Result<Result<(), error::Delete>, Whatever> {
trace!(self.logger, "delete"; "path" => path, "version" => ?version);
let version = version.unwrap_or(-1);
self.connection
.enqueue(proto::Request::Delete {
Expand All @@ -455,11 +441,11 @@ impl ZooKeeper {
///
/// If no node exists for the given path, the returned future resolves with an error of
/// [`error::GetAcl::NoNode`].
#[instrument]
pub async fn get_acl(
&self,
path: &str,
) -> Result<Result<(Vec<Acl>, Stat), error::GetAcl>, Whatever> {
trace!(self.logger, "get_acl"; "path" => path);
self.connection
.enqueue(proto::Request::GetAcl {
path: path.to_string(),
Expand All @@ -477,6 +463,7 @@ impl ZooKeeper {
/// If no node exists for the given path, the returned future resolves with an error of
/// [`error::SetAcl::NoNode`]. If the given `version` does not match the ACL version, the
/// returned future resolves with an error of [`error::SetAcl::BadVersion`].
#[instrument(skip(acl))]
pub async fn set_acl<A>(
&self,
path: &str,
Expand All @@ -486,7 +473,6 @@ impl ZooKeeper {
where
A: Into<Cow<'static, [Acl]>>,
{
trace!(self.logger, "set_acl"; "path" => path, "version" => ?version);
let version = version.unwrap_or(-1);
self.connection
.enqueue(proto::Request::SetAcl {
Expand All @@ -511,8 +497,8 @@ impl ZooKeeper {
WithWatcher(self)
}

#[instrument(name = "exists")]
async fn exists_w(&self, path: &str, watch: Watch) -> Result<Option<Stat>, Whatever> {
trace!(self.logger, "exists"; "path" => path, "watch" => ?watch);
self.connection
.enqueue(proto::Request::Exists {
path: path.to_string(),
Expand All @@ -527,12 +513,12 @@ impl ZooKeeper {
self.exists_w(path, Watch::None).await
}

#[instrument]
async fn get_children_w(
&self,
path: &str,
watch: Watch,
) -> Result<Option<Vec<String>>, Whatever> {
trace!(self.logger, "get_children"; "path" => path, "watch" => ?watch);
self.connection
.enqueue(proto::Request::GetChildren {
path: path.to_string(),
Expand All @@ -551,12 +537,12 @@ impl ZooKeeper {
self.get_children_w(path, Watch::None).await
}

#[instrument]
async fn get_data_w(
&self,
path: &str,
watch: Watch,
) -> Result<Option<(Vec<u8>, Stat)>, Whatever> {
trace!(self.logger, "get_data"; "path" => path, "watch" => ?watch);
self.connection
.enqueue(proto::Request::GetData {
path: path.to_string(),
Expand Down Expand Up @@ -774,16 +760,18 @@ mod tests {
use super::*;

use futures::StreamExt;
use slog::Drain;
use snafu::Whatever;
use tracing::Level;

fn init_tracing_subscriber() {
let _ = tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.try_init();
}

#[tokio::test]
async fn it_works() {
let mut builder = ZooKeeperBuilder::default();
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
builder.set_logger(slog::Logger::root(drain, o!()));
init_tracing_subscriber();
let builder = ZooKeeperBuilder::default();

let (zk, w) = builder
.connect(&"127.0.0.1:2181".parse().unwrap())
Expand Down Expand Up @@ -977,11 +965,8 @@ mod tests {

#[tokio::test]
async fn acl_test() {
let mut builder = ZooKeeperBuilder::default();
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
builder.set_logger(slog::Logger::root(drain, o!()));
init_tracing_subscriber();
let builder = ZooKeeperBuilder::default();

let (zk, _) = (builder.connect(&"127.0.0.1:2181".parse().unwrap()))
.await
Expand Down Expand Up @@ -1034,11 +1019,8 @@ mod tests {

#[tokio::test]
async fn multi_test() {
let mut builder = ZooKeeperBuilder::default();
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
builder.set_logger(slog::Logger::root(drain, o!()));
init_tracing_subscriber();
let builder = ZooKeeperBuilder::default();

async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result<Vec<bool>, Whatever> {
let mut res = Vec::new();
Expand Down
Loading