Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async client (unchanged API) #43

Merged
58 commits merged into from Jun 24, 2020
Merged

Async client (unchanged API) #43

58 commits merged into from Jun 24, 2020

Conversation

ghost
Copy link

@ghost ghost commented May 7, 2020

Okay here's a quick sketch of what a new, single-threaded client based on smol might look like.
Right now, this simple client can connect to demo.nats.io and publish messages.

@spacejam What do you think and how would you like to move forward from here?

@ghost ghost requested a review from spacejam May 7, 2020 14:18
@derekcollison
Copy link
Member

Let's make sure we are set on our high level goals.

I think what we want to achieve is a runtime agnostic client that can easily plug into smol, tokio or async-std. Maybe default to smol and have a mush easier out of the box experience but you can swap out runtimes as needed.

That make any sense? You both are the experts of course ;)


fn main() -> io::Result<()> {
// Useful commands for testing:
// nats-sub -s nats://demo.nats.io:4222 hello
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need nats:// or the 4222 which is default.

nats-sub -s demo.nats.io hello

@spacejam
Copy link
Contributor

spacejam commented May 7, 2020

it's nice that so far this is pretty tight. I think the next steps to sketch out for architectural soundness assessment are:

  1. transition from connected -> disconnected state
  2. transition from disconnected -> connected state if a reconnect happens within the retry bound
  3. transition from disconnected -> closed state if a reconnect does not happen within the retry bound
  4. transition from connected -> closed if the close method is called

@ghost
Copy link
Author

ghost commented May 7, 2020

This is just a sketch and a draft, don't look too much into the details right now (:

That make any sense? You both are the experts of course ;)

Totally. So the goal is to make using this client a seamless experience - and this client works with tokio, async-std, smol, what have you.

How? If you look at the code, this client spawns a thread that calls smol::run(), thus creating a tiny executor for its own use, which is independent from the end-user's executor. This way the user doesn't need to worry about runtimes at all - everything just works, no matter what runtime the user is running.

Really, all we need in the NATS client is a thread that polls the client() future and waits on the network using epoll.

@derekcollison
Copy link
Member

I like that, very clean. I was worried that in the effort to support other runtimes we would have to supply a lower level future based api..

@spacejam
Copy link
Contributor

spacejam commented May 7, 2020

@derekcollison we will be compatible with every executor, because none of the user-facing types that implement Future will have runtime assumptions.

we can make all functions async without requiring an async runtime, except for the flush commands, which we may add a flush_async method for to be non-blocking to play better with runtimes. So, this won't impact the existing API at all, it will just spin up fewer threads and rely on no mutexes in the core state machine, removing concurrency which has been at the root of the bugs I've been finding. Maybe just 1 additional method (or 2 for flush_timeout_async) in addition for non-blocking flush support.

But at this point, it's a high-level sketch, and after testing the high-level state transition architecture, we will drill down on the granularity of efforts as we move things over.

@derekcollison
Copy link
Member

Thanks. Excited to see how this plays out and to have Rust+NATS+Smol powered services.

.await?;

// Current subscriptions in the form `(subject, sid, messages)`.
let mut subscriptions: Vec<(String, usize, mpsc::UnboundedSender<Message>)> = Vec::new();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No locks on subscriptions, the whole client state is contained inside this client() function :)

@ghost
Copy link
Author

ghost commented May 12, 2020

I pushed some more changes. The client can now do basic handling of SUB, UNSUB, MSG, PUB.

The architecture is pretty clean:

  • messages from the server are modeled as a Stream<Item = io::Result<ServerOp>>
  • the whole client state is contained inside the client() function (no mutexes!)
  • the public API (subscribe(), next_msg(), publish()) communicates with the client thread only through channels

@derekcollison
Copy link
Member

We want to check how this impacts performance. NATS is known for performance, and so is Rust, so this needs to be a top goal. My original pass was about on par with Go more or less but needed some improvement.

@ghost
Copy link
Author

ghost commented May 14, 2020

My original pass was about on par with Go more or less but needed some improvement.

I'm curious - what tests did you use? I'd like to run the same thing.

@derekcollison
Copy link
Member

I did a simple pub and pub/sub for baseline. We have nats-bench under the Go client that is more involved.

@derekcollison
Copy link
Member

On my simple test we have lost about 3M msgs/sec in performance.


// Periodically flush writes to the server.
_ = Timer::at(next_flush).fuse() => {
writer.flush().await?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it behave if it can't flush whole buffer? If it blocks instead of returning success on partial flush, this operation can deadlock. If recently flushed command make server to send huge amount of data back, it might end up blocking on on it's own writes, waiting for client to read data from TCP stream.

Timeout on flush can help to resolve deadlock

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still a draft. deadlock doesn't happen in this system because of the nats-server's flush logic that ejects slow consumers. duplex progress is incoming.

@spacejam
Copy link
Contributor

@derekcollison perf will come quite easily after the control loop is minimized. are you running against the new client? on my beefy laptop, the benches for 32 byte messages top out at 2.3 million/s. did you run your baseline before the test without disabling frequency throttling and turbo boost? this can cause massive perf loss while running the identical workload twice.

@ghost ghost changed the title [WIP] new async client Async client (same API) Jun 23, 2020
@ghost ghost changed the title Async client (same API) Async client (unchanged API) Jun 23, 2020
@ghost
Copy link
Author

ghost commented Jun 24, 2020

I had to bump minimum required rust version (MSRV) to 1.40.0 because 1.39.0 is simply buggy. In particular, it can't compile this:

impl<T> Unblock<T> {
    pub async fn get_mut(&mut self) -> &mut T {
        // ...
    }
}

@spacejam spacejam mentioned this pull request Jun 24, 2020
src/subscription.rs Outdated Show resolved Hide resolved
src/asynk/subscription.rs Show resolved Hide resolved
src/asynk/options.rs Outdated Show resolved Hide resolved
@spacejam
Copy link
Contributor

I've renamed ConnectionOptions to Options internally, and then I added a deprecated public type alias from ConnectionOptions to Options, which allows code to keep working without any changes, but there will be a deprecation warning that people see that encourages them to move over to the new name. So, this could be released as a point release 0.5.1 without changing any existing APIs.

The addition is that Options now has a connect_async method, which returns an async-capable Connection. We've put this into the asynk module because the name async is a reserved keyword, and we figured we would follow the common pattern in other languages of renaming c's in reserved words to k, like klass etc...

@ghost ghost merged commit 524f566 into master Jun 24, 2020
@ghost ghost deleted the wip-async branch June 24, 2020 16:52
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants