Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async client (unchanged API) #43

Merged
58 commits merged into from Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
5fb7d19
WIP new async client
May 7, 2020
2621314
Refactor the new client
May 12, 2020
54138fa
Add periodic flushing
May 12, 2020
54fdc3a
Simplify operation queueing
May 13, 2020
fb26fc8
Refactor
May 13, 2020
189efc8
Implement flush
May 13, 2020
ff90ac4
Some cleanup
May 13, 2020
6b6196a
Implement reconnect
May 18, 2020
12ef099
Count reconnects properly
May 18, 2020
e3473c8
Implement authentication
May 19, 2020
e32880d
Simplify options
May 19, 2020
07bfd1a
Merge branch 'master' into wip-async
May 19, 2020
33890af
Adapt to the new benchmark
May 19, 2020
96986a3
Merge branch 'master' into wip-async
May 25, 2020
0683bc8
Implement reconnect buffer
Jun 1, 2020
eea49ea
Fix a few bugs, add tests
Jun 2, 2020
abf5122
Move pongs into Writer
Jun 2, 2020
2b8558a
Refactor
Jun 11, 2020
7e40310
Merge branch 'master' into wip-async
Jun 11, 2020
66db04d
Tune performance
Jun 15, 2020
e546e7d
Merge branch 'master' into wip-async
Jun 15, 2020
5aef3f1
Make clippy happy
Jun 15, 2020
3057bff
Fix a compilation error
Jun 15, 2020
c872c66
Turn off annoying clippy lint
Jun 15, 2020
7531e45
Turn off annoying lints
Jun 16, 2020
96e61b5
Add draining
Jun 16, 2020
6c03f8c
Fix failing doctests
Jun 16, 2020
b949fa0
Implement missing Connnection methods
Jun 16, 2020
4cec320
Add AsyncConnection
Jun 16, 2020
9958d60
Add AsyncSubscription and AsyncMessage
Jun 17, 2020
26739a3
Update options builder
Jun 17, 2020
d1f6a10
Undo clippy suppressions
Jun 17, 2020
9c112b1
Fix clippy
Jun 17, 2020
f4d743c
Fix clippy
Jun 17, 2020
58ee13a
Fix clippy
Jun 17, 2020
29b654b
Fix reconnection test
Jun 17, 2020
f920541
Update actions-rs
Jun 17, 2020
9d0beca
Merge codebases
Jun 23, 2020
e6b58f0
Fix failing tests
Jun 23, 2020
6be458d
Update smol
Jun 23, 2020
41fdc7b
Remove unused code
Jun 23, 2020
5945828
Delete unused sync API in asynk module
Jun 23, 2020
ce96dc8
Cleanup
Jun 23, 2020
55fa9d9
Cleanup
Jun 23, 2020
56609ad
Final touch
Jun 23, 2020
90e8a9f
Workaround for broken MSRV by base64-url
Jun 23, 2020
b4977b5
Upgrade dependencies
Jun 23, 2020
a2b7dec
Bump MSRV to 1.40.0
Jun 24, 2020
303c44c
Delete the Async prefix
Jun 24, 2020
8404b72
Cleanup dependencies
Jun 24, 2020
bd9ff49
Merge branch 'master' into wip-async
Jun 24, 2020
b7d444c
Move Options into the root of the crate. Rename asynk::Options::conne…
spacejam Jun 24, 2020
4487bc5
implement Handler::unsubscribe
spacejam Jun 24, 2020
98149d6
Small cleanup
spacejam Jun 24, 2020
db24a82
Some clippy feedback. Remove clippy lints that are difficult to use w…
spacejam Jun 24, 2020
f7bd2db
Fix renamed clippy lints
spacejam Jun 24, 2020
96e2da8
Fix renamed clippy lints
spacejam Jun 24, 2020
16a2beb
Fix renamed clippy lints
spacejam Jun 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -39,6 +39,9 @@ log = "0.4.8"
nkeys = "0.0.9"
base64-url = "1.1.13"
once_cell = "1.3.1"
smol = "0.1.4"
piper = "0.1.1"
futures = "0.3.4"

[dev-dependencies]
criterion = "0.3.2"
Expand Down
24 changes: 24 additions & 0 deletions examples/new-client.rs
@@ -0,0 +1,24 @@
use std::io;
use std::thread;
use std::time::Duration;

use nats::new_client::Connection;

fn main() -> io::Result<()> {
// Useful commands for testing:
// nats-sub -s nats://demo.nats.io hello
// nats-pub -s nats://demo.nats.io hello 'hi from nats-pub'

let mut nc = Connection::connect("demo.nats.io:4222")?;
let mut sub = nc.subscribe("hello");

thread::sleep(Duration::from_secs(1));

nc.publish("hello", "hi from new-client")?;
nc.flush()?;

loop {
let msg = sub.next_msg()?;
println!("{}", msg);
}
}
7 changes: 5 additions & 2 deletions src/lib.rs
Expand Up @@ -9,8 +9,9 @@
//!
//! [https://nats.io/]: https://nats.io/

#![cfg_attr(test, deny(warnings))]
#![deny(
#![recursion_limit = "1024"]
// #![cfg_attr(test, deny(warnings))]
#![warn(
missing_docs,
future_incompatible,
nonstandard_style,
Expand Down Expand Up @@ -92,6 +93,8 @@ mod parser;
mod secure_wipe;
mod shared_state;

pub mod new_client;

#[cfg(feature = "fault_injection")]
mod fault_injection;

Expand Down
223 changes: 223 additions & 0 deletions src/new_client/client.rs
@@ -0,0 +1,223 @@
use std::collections::VecDeque;
use std::io::{self, Error, ErrorKind};
use std::net::TcpStream;
use std::thread;
use std::time::{Duration, Instant};

use futures::{
channel::{mpsc, oneshot},
io::{BufReader, BufWriter},
prelude::*,
};
use piper::Arc;
use smol::{Async, Timer};

use crate::connect::ConnectInfo;
use crate::new_client::decoder::{decode, ServerOp};
use crate::new_client::encoder::{encode, ClientOp};
use crate::Message;

/// An operation enqueued by the user of this crate.
///
/// These operations are enqueued by `Connection` or `Subscription` and are then handled by the
/// client thread.
pub(crate) enum UserOp {
/// Publish a message.
Pub {
subject: String,
reply_to: Option<String>,
payload: Vec<u8>,
},

/// Subscribe to a subject.
Sub {
subject: String,
queue_group: Option<String>,
sid: usize,
messages: mpsc::UnboundedSender<Message>,
},

/// Unsubscribe from a subject.
Unsub { sid: usize, max_msgs: Option<u64> },

/// Send a ping and wait for a pong.
Ping { pong: oneshot::Sender<()> },

/// Close the connection.
Close,
}

/// Spawns a client thread.
pub(crate) fn spawn(
url: &str,
user_ops: mpsc::UnboundedReceiver<UserOp>,
) -> thread::JoinHandle<io::Result<()>> {
let url = url.to_string();
thread::spawn(move || smol::run(client(&url, user_ops)))
}

/// Runs the main loop for a client.
async fn client(url: &str, mut user_ops: mpsc::UnboundedReceiver<UserOp>) -> io::Result<()> {
let stream = Arc::new(Async::<TcpStream>::connect(url).await?);

// TODO(stjepang): Make this option configurable.
let flush_timeout = Duration::from_millis(100);

// Bytes written to the server are buffered and periodically flushed.
let mut next_flush = Instant::now() + flush_timeout;
let mut writer = BufWriter::new(stream.clone());

// Create an endless stream parsing operations from the server.
let mut server_ops = stream::try_unfold(BufReader::new(stream), |mut stream| async {
// Decode a single operation.
let op = decode(&mut stream).await?;
io::Result::Ok(Some((op, stream)))
})
.boxed();

// Expect an INFO message.
let mut server_info = match server_ops
.try_next()
.await?
.expect("end of what should be an infinite stream")
{
ServerOp::Info(server_info) => server_info,
_ => return Err(Error::new(ErrorKind::Other, "expected an INFO message")),
};

// Current subscriptions in the form `(subject, sid, messages)`.
let mut subscriptions: Vec<(String, usize, mpsc::UnboundedSender<Message>)> = Vec::new();

// Expected pongs and their notification channels.
let mut pongs: VecDeque<oneshot::Sender<()>> = VecDeque::new();

// Send a CONNECT operation to the server.
encode(
&mut writer,
ClientOp::Connect(ConnectInfo {
tls_required: false,
name: None,
pedantic: false,
verbose: false,
lang: crate::LANG.to_string(),
version: crate::VERSION.to_string(),
user: None,
pass: None,
auth_token: None,
user_jwt: None,
signature: None,
echo: true,
}),
)
.await?;

// Handle events in a loop.
loop {
futures::select! {
// An operation was received from the server.
res = server_ops.try_next().fuse() => {
let op = res?.expect("end of what should be an infinite stream");

match op {
ServerOp::Info(new_server_info) => {
server_info = new_server_info;
}

ServerOp::Ping => {
// Send a PONG operation to the server.
encode(&mut writer, ClientOp::Pong).await?;
}

ServerOp::Pong => {
// Take the next expected pong from the queue.
let pong = pongs.pop_front().expect("unexpected pong");

// Complete the pong by sending a message into the channel.
let _ = pong.send(());
}

ServerOp::Msg { subject, sid, reply_to, payload } => {
// Send the message to matching subscriptions.
for (_, _, messages) in
subscriptions.iter().filter(|(_, s, _)| *s == sid)
{
let _ = messages.unbounded_send(Message {
subject: subject.clone(),
reply: reply_to.clone(),
data: payload.clone(),
responder: None,
});
}
}

ServerOp::Err(msg) => {
log::error!("received -ERR '{}'", msg);
}

ServerOp::Unknown(line) => {
log::warn!("unknown op: {}", line);
}
}
}

// The user has enqueued an operation.
msg = user_ops.next().fuse() => {
match msg.expect("user_ops disconnected") {
UserOp::Pub { subject, reply_to, payload } => {
// Send a PUB operation to the server.
encode(&mut writer, ClientOp::Pub {
subject,
reply_to: None,
payload
})
.await?;
}

UserOp::Sub { subject, queue_group, sid, messages } => {
// Add the subscription to the list.
subscriptions.push((subject.clone(), sid, messages));

// Send a SUB operation to the server.
encode(&mut writer, ClientOp::Sub {
subject,
queue_group,
sid,
})
.await?;
}

UserOp::Unsub { sid, max_msgs } => {
// Remove the subscription from the list.
subscriptions.retain(|(_, s, _)| *s != sid);

// Send an UNSUB operation to the server.
encode(&mut writer, ClientOp::Unsub {
sid,
max_msgs,
})
.await?;
}

UserOp::Ping { pong } => {
// Send a PING operation to the server.
encode(&mut writer, ClientOp::Ping).await?;

// Record that we're expecting a pong.
pongs.push_back(pong);
}

UserOp::Close => {
// TODO(stjepang): Perhaps we should flush before closing abruptly.
return Ok(());
}
}
}

// Periodically flush writes to the server.
_ = Timer::at(next_flush).fuse() => {
writer.flush().await?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it behave if it can't flush whole buffer? If it blocks instead of returning success on partial flush, this operation can deadlock. If recently flushed command make server to send huge amount of data back, it might end up blocking on on it's own writes, waiting for client to read data from TCP stream.

Timeout on flush can help to resolve deadlock

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still a draft. deadlock doesn't happen in this system because of the nats-server's flush logic that ejects slow consumers. duplex progress is incoming.

next_flush = Instant::now() + flush_timeout;
}
}
}
}
105 changes: 105 additions & 0 deletions src/new_client/connection.rs
@@ -0,0 +1,105 @@
use std::io::{self, Error, ErrorKind};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

use futures::channel::{mpsc, oneshot};
use smol::block_on;

use crate::new_client::client::{self, UserOp};
use crate::new_client::subscription::Subscription;

/// A NATS client connection.
pub struct Connection {
/// Enqueues user operations.
user_ops: mpsc::UnboundedSender<UserOp>,

/// Subscription ID generator.
sid_gen: AtomicUsize,

/// Thread running the main future.
thread: Option<thread::JoinHandle<io::Result<()>>>,
}

impl Connection {
/// Connects a NATS client.
pub fn connect(url: &str) -> io::Result<Connection> {
// Spawn a client thread.
let (sender, receiver) = mpsc::unbounded();
let thread = client::spawn(url, receiver);

// Connection handle controlling the client thread.
let mut conn = Connection {
user_ops: sender,
sid_gen: AtomicUsize::new(1),
thread: Some(thread),
};

// Flush to send a ping and wait for the connection to establish.
conn.flush()?;

// All good! The connection is now ready.
Ok(conn)
}

/// Publishes a message.
pub fn publish(&mut self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<()> {
let subject = subject.to_string();
let payload = msg.as_ref().to_vec();
let reply_to = None;

// Enqueue a PUB operation.
self.user_ops
.unbounded_send(UserOp::Pub {
subject,
reply_to,
payload,
})
.map_err(|err| Error::new(ErrorKind::ConnectionReset, err))?;

Ok(())
}

/// Creates a new subscriber.
pub fn subscribe(&mut self, subject: &str) -> Subscription {
let sid = self.sid_gen.fetch_add(1, Ordering::SeqCst);
Subscription::new(subject, sid, self.user_ops.clone())
}

/// Flushes by performing a round trip to the server.
pub fn flush(&mut self) -> io::Result<()> {
let (sender, receiver) = oneshot::channel();

// Enqueue a PING operation.
self.user_ops
.unbounded_send(UserOp::Ping { pong: sender })
.map_err(|err| Error::new(ErrorKind::ConnectionReset, err))?;

// Wait until the PONG operation is received.
let _ = block_on(receiver);

Ok(())
}

/// Close the connection.
pub fn close(&mut self) -> io::Result<()> {
if let Some(thread) = self.thread.take() {
// Enqueue a close operation.
let _ = self.user_ops.unbounded_send(UserOp::Close);

// Wait for the client thread to stop.
thread
.join()
.expect("client thread has panicked")
.map_err(|err| Error::new(ErrorKind::ConnectionReset, err))?;
}

Ok(())
}
}

impl Drop for Connection {
fn drop(&mut self) {
// Close the connection in case it hasn't been already.
let _ = self.close();
}
}