-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix jetstream reconnect #610
Conversation
Jarema
commented
Aug 15, 2022
•
edited
edited
- Added sending new request when disconnect followed by connect state is detected
- Added checking for timeout of pull request
- improved nats-server crate to allow restarts
e6a7883
to
3ae2a7e
Compare
56e9836
to
9c65c78
Compare
context.client.state.changed().await.ok(); | ||
let state = context.client.state.borrow().to_owned(); | ||
|
||
let timeout_threshold = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collapse this into map_or?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree. fixed.
@@ -392,6 +397,7 @@ pub struct Stream<'a> { | |||
subject: String, | |||
batch_config: BatchConfig, | |||
request: Option<BoxFuture<'a, Result<(), Error>>>, | |||
since_last_request: Arc<Mutex<Instant>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't have to point this out but, obviously preferable to not have a mutex here.
But for a fix, we can allow it with the intent to iterate soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we could go with pending as atomic shared between task and iterator fn, with Instant hold on task only and having task only sending requests, but that also needs returning error from it via channel to iterator. I would prefer revisit it soon and now focus on more important work.
2d464af
to
c37a3af
Compare
async-nats/tests/jetstream_tests.rs
Outdated
@@ -1280,6 +1281,73 @@ mod jetstream { | |||
assert_eq!(info.num_ack_pending, 8); | |||
} | |||
|
|||
#[tokio::test] | |||
#[cfg_attr(target_os = "windows", ignore)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take it that Windows reconnect remain flaky?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, i actually changed the reconnections, allowing reset of server, but I would limit windows tests where feasible. Those windows gha runners are so slow they have weird impact on nats-server and connection.
@@ -24,6 +24,13 @@ use rand::Rng; | |||
use regex::Regex; | |||
|
|||
pub struct Server { | |||
inner: Inner, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the indirection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because if you would assing new server directly to *self, drop() is called, which wipes the data etc. i want to keep wiping data on drop.
Other solution is to return big tuple from run_server internal fn, but thats ugly.
inner looked like most convenient and clean way to approach this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we'll revisit the internals of this in the future but lgtm 👍
1af29ce
to
3af83a1
Compare