Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for other executors ? #110

Closed
dawid-nowak opened this issue Oct 13, 2020 · 16 comments
Closed

Support for other executors ? #110

dawid-nowak opened this issue Oct 13, 2020 · 16 comments
Assignees

Comments

@dawid-nowak
Copy link

@stjepang Forgive me in advance :) but I was wondering if there is any easy way to remove Smol dependency such that I can only use one executor in my app?
My application relies heavily on Tokio and Tokio ecosystem so naturally, I would prefer to keep it that way.

The rationale for my request is that when I was trying to convert my app to use Async NATS, I have run into some locking problems when I was trying to pass messages from tasks running in Smal to tasks running in Tokio via channels.

I must admit, I quickly revert back to the blocking code, but it would be nice if everything was executed asynchronously in one executor.

@derekcollison derekcollison assigned ghost Oct 13, 2020
@ghost
Copy link

ghost commented Oct 20, 2020

The rationale for my request is that when I was trying to convert my app to use Async NATS, I have run into some locking problems when I was trying to pass messages from tasks running in Smal to tasks running in Tokio via channels.

Interesting - do you perhaps have more information about this issue? Is there a way to reproduce it?

I'm a bit skeptical that the problem was due to interaction of smol and tokio - the nats crate should just work when called from any runtime (or even outside a runtime). I can't think of any possible locking issues caused by that. So perhaps there was something else going on...

I was wondering if there is any easy way to remove Smol dependency such that I can only use one executor in my app?

There are typically two parts to a runtime ("runtime" is a bit confusing word ascribed lots of kinds of meanings): reactor (polls I/O using epoll/kqueue/etc.) and executor (polls Futures). The nats crate uses async-io and async-executor. I'd like to avoid the scenario where we add custom integration for every runtime due to complexity and the maintenance cost that comes with it.

Still, I do think it would be beneficial to have some kind of integration. One idea would be for Connection to return to the user a Future that is currently run by the client thread so that the user can spawn it on whatever executor they are using. This would reduce cross-thread communication and improve cache locality, possibly bringing performance improvements. I'll think about this some more.

@dawid-nowak
Copy link
Author

Thanks for the response and clarifications.

So let's say I have an example like this one. If my understanding is correct then I will have one Executor created by Tokio and then another executor is created when NATS Client:connect is called and as far I see these are completely separated. It looks like NATS Client is creating a thread for itself to run its logic.

Other idea could be to use Options::with_executor().connect("demo") ??

use nats::{
    self
};
use futures::stream::StreamExt;

#[tokio::main(core_threads = 1)]
async fn main() {
    let nats = tokio::spawn(async move {
	let make_conn = nats::asynk::connect("nats://172.17.0.2:4222").await;
	if let Ok(nc) = make_conn{
	    if let Ok(mut s) = nc.queue_subscribe("test","test").await{
		while let Some(msg) = s.next().await{
		    println!("Got {:?}", msg.data);
		};
	    }
	}
    });

  nats.await
}

@ghost
Copy link

ghost commented Nov 2, 2020

So let's say I have an example like this one. If my understanding is correct then I will have one Executor created by Tokio and then another executor is created when NATS Client:connect is called and as far I see these are completely separated. It looks like NATS Client is creating a thread for itself to run its logic.

Yes, that's correct.

Other idea could be to use Options::with_executor().connect("demo") ??

Something like that. Perhaps even Options::new().with_spawn(tokio::spawn)...

@dennybritz
Copy link

For what it's worth, I've also run into issues with tokio and tasks getting stuck when using the async part of this crate and I've reverted back to sync. I'm having a hard time reproducing the issue and no longer have the async code, but it seems that something strange is going on with mixing the runtimes.

@ghost
Copy link

ghost commented Nov 3, 2020

Interesting. I wonder if this only happens when mixing smol and tokio or also happens when using just smol. Either way, it would be great if we could get a reproducible example.

Also, do you perhaps know more about how the locking issue manifests? Does Subscription stop delivering messages when it should be? Or does something else happen? Is there perhaps a panic message printed on stderr?

@dennybritz
Copy link

dennybritz commented Nov 3, 2020

What I experienced was with writing, not reading. However, it was a pretty complex application with lots of moving parts and streams, so I can't rule other sources for the issue. I did try replacing the NATS publishing with something else (writing to stdout) and didn't run into the problem - that's why I figured it's something in this crate - but not sure. Perhaps just something to keep in mind in case any more people run into this.

@ghost
Copy link

ghost commented Nov 3, 2020

Okay that’s good to know. Do you perhaps remember if there were multiple tasks concurrently publishing messages or just one?

@dennybritz
Copy link

Sorry, I don't quite remember if the multiple producers were concurrent or synchronized.

@dawid-nowak
Copy link
Author

i was trying to reproduce the problem but so far wasn't able to with v0.8

@spacejam
Copy link
Contributor

Closing this as we have significantly reworked the async code in question. If the issue crops up again with the newly split out async-nats crate, please open another issue.

@ngryman
Copy link

ngryman commented Nov 18, 2021

I think I might face a similar issue with async not behaving as expected. Here is a simple repro with the latest version.

I'm not a specialist with async runtimes so I won't risk myself theorizing what's happening.

The only thing I know is that this contreived example works as expected:

// Subscribe
let nc = async_nats::connect(&nats.address().to_string()).await?;
let sub = nc.subscribe("test").await?;

// Publish
let nc = async_nats::connect(&nats.address().to_string()).await?;
nc.publish("test", "foo").await?;

// Assert
let msg = sub.next().await.unwrap();
assert_eq!(String::from_utf8_lossy(&msg.data), "foo");

while this variant time outs:

// Subscribe
async fn subscribe(addr: &str, subject: &str) -> Result<Subscription> {
  let nc = async_nats::connect(addr).await?;
  let sub = nc.subscribe(subject).await?;
  Ok(sub)
}
let sub = subscribe(&nats.address().to_string(), "test").await?;

// Publish
let nc = async_nats::connect(&nats.address().to_string()).await?;
nc.publish("test", "foo").await?;

// Assert
let msg = sub.next().await.unwrap();
assert_eq!(String::from_utf8_lossy(&msg.data), "foo");

I would expect the second variant to work the same way. But for some reason, the second connection seems to be delayed after the event has been published, resulting in the subscription never receiving this event.

@Jarema
Copy link
Member

Jarema commented Nov 18, 2021

@ngryman

First and foremost: We are planning a rework of whole async behaviour of NATS Rust Client to support many runtimes.
More information about that will come soon.

About your current issue:
I strongly suggest using latest NATS 0.16.0 client version and asynk module instead the one you used.

Also, try creating just one connection and use it to subscribe, then publish, then fetch the messages on subscription with one of available methods.

Let us know if that helped.

@ngryman
Copy link

ngryman commented Nov 19, 2021

@Jarema Thanks for the quick response.

First and foremost: We are planning a rework of the whole async behaviour of NATS Rust Client to support many runtimes.
More information about that will come soon.

Awesome, that's great news!

About my issue:
I followed your advice and switched to the nats::asynk module. Unfortunately, it stills time outs (cf. repro). My repro example is contrived, but it reflects the constraints of my real use case: I can't re-use the same client for publishing and subscribing.

It's not a blocker for me either; I can work around this while waiting for official support of tokio / async_std. Do you have a rough ETA for this?

Thank you for all the great work you're all doing here! (we too often forget to be thankful to OSS maintainers 👍)

@Jarema
Copy link
Member

Jarema commented Nov 19, 2021

If you need to create subscription in another function, don't create connection in that function scope, just pass it in.
Also, If you really need more than one connection for some reason, I wouldn't assing it to the same variable, but create a new one.

Small changes in your code makes it work:

use anyhow::Result;
use nats::asynk::Subscription;
use nats_test_server::NatsTestServer;

mod test {
  use super::*;

  pub(super) async fn success() -> Result<()> {
    let nats = NatsTestServer::build().spawn();

    // Subscribe
    let nc = nats::asynk::connect(&nats.address().to_string()).await?;
    let sub = nc.subscribe("test").await?;

    // Publish
    let nc2 = nats::asynk::connect(&nats.address().to_string()).await?;
    nc2.publish("test", "foo").await?;

    // Assert
    let msg = sub.next().await.unwrap();
    assert_eq!(String::from_utf8_lossy(&msg.data), "foo");
    println!("success: {}", String::from_utf8(msg.data).unwrap());
    Ok(())
  }

  pub(super) async fn timeout() -> Result<()> {
    let nats = NatsTestServer::build().spawn();

    let nc = nats::asynk::connect(&nats.address().to_string()).await?;
    // Subscribe
    let sub = timeout::subscribe(nc.clone(), "test").await?;

    // Publish
    nc.publish("test", "foo").await?;
    // Assert
    let msg = sub.next().await.unwrap();
    assert_eq!(String::from_utf8_lossy(&msg.data), "foo");
    println!("success: {}", String::from_utf8(msg.data).unwrap());
    Ok(())
  }

  mod timeout {
    use nats::asynk::Connection;

    use super::*;

    pub(super) async fn subscribe(nc: Connection, subject: &str) -> Result<Subscription> {
      let sub = nc.subscribe(subject).await?;
      Ok(sub)
    }
  }
}

#[async_std::main]
async fn main() -> Result<()> {
  test::success().await?;
  test::timeout().await?;
  Ok(())
}

About async rework: Currently we're working hard on feature parity catch up (some JetStream features and KV). When it's done, async will probably come next.

@mara-schulke
Copy link

Would you accept an pr for async std support? @Jarema
My org needs async std support asap and i would go ahead and split the code using compile time features to support both runtimes depending on the feature the dev set.

@Jarema
Copy link
Member

Jarema commented Apr 28, 2022

Would you accept an pr for async std support? @Jarema

My org needs async std support asap and i would go ahead and split the code using compile time features to support both runtimes depending on the feature the dev set.

Hey!
Are you aware of the new async client?
It can be easily used in async-std setups with tokio compat feature flag and perform without a noticable difference.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants