New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Drop timeout if max concurrent timeouts reached #324
Conversation
@@ -47,7 +47,10 @@ impl<S, I> Stream for SuspendableStream<S> | |||
self.timeout = Some(timeout); | |||
return Ok(Async::NotReady); | |||
} | |||
Err(_) => unreachable!("Polling a delay shouldn't yield any errors; qed") | |||
Err(_) => { | |||
// we've reached maximum number of concurrent timers; drop the timeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is not good, as the Stream
will not be called anymore by the tokio runtime. What we could do is the following:
// Wake your `Stream` is the next round again (poll is called again)
futures::task::current().notify();
Then we also add some sort of counter, that counts how many successive calls we have, without being able to register a delay or polling the underlying socket. After around 1000 cycles, we should try something different, like maybe putting the whole thread sleeping and hoping that some other thread is able to finish its work and we are able to accept new connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is not good, as the
Stream
will not be called anymore by the tokio runtime.
can you please explain how?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean why the Stream
will not be called anymore by Tokio?
Tokio uses internally mio
and that uses epoll
on Linux. Epoll will just notify you, if you have polled the resource and you were instructed to wait. If you polled the resource and got an error, you are probably not registered anymore to be informed about any new events that happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you polled the resource and got an error, you are probably not registered anymore
Streams don't terminate when they encounter an error.
https://docs.rs/futures/0.1.21/futures/stream/trait.Stream.html#errors
and the socket actually has nothing to do with with the timeout, I think if you read the full code you'd have a better understanding
jsonrpc/server-utils/src/suspendable_stream.rs
Lines 42 to 81 in f42e5e6
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, ()> { | |
if let Some(mut timeout) = self.timeout.take() { | |
match timeout.poll() { | |
Ok(Async::Ready(_)) => {} | |
Ok(Async::NotReady) => { | |
self.timeout = Some(timeout); | |
return Ok(Async::NotReady); | |
} | |
Err(_) => { | |
// we've reached maximum number of concurrent timers; drop the timeout, | |
// and attempt to accept a new connection. | |
} | |
} | |
} | |
loop { | |
match self.stream.poll() { | |
Ok(item) => { | |
if self.next_delay > self.initial_delay { | |
self.next_delay = self.initial_delay; | |
} | |
return Ok(item) | |
} | |
Err(ref err) => { | |
if connection_error(err) { | |
warn!("Connection Error: {:?}", err); | |
continue | |
} | |
self.next_delay = if self.next_delay < self.max_delay { | |
self.next_delay * 2 | |
} else { | |
self.next_delay | |
}; | |
warn!("Error accepting connection: {}", err); | |
warn!("The server will stop accepting connections for {:?}", self.next_delay); | |
self.timeout = Some(Delay::new(Instant::now() + self.next_delay)); | |
} | |
} | |
} | |
} |
If we attempt to poll the timeout and it yields an error (which in this case would be an at_capacity
error). there's nothing we can do other than to shed load, by dropping the timeout and proceeding to resume polling the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, the futures
documentation says that Stream
does not terminate when you get an error. But, I talked about Tokio
and that is just an implementation of a futures::Executor
. Let's take an example, let's say you want to wrap a Stream
in a Future
. The Future
will just poll
the Stream
and should return Ok(Ready())
when the Stream is finished, but you want also make sure that you at most pull out 5 items from the
Stream` at a time. So you would do something like:
fn poll(&mut self) -> Poll<> {
for _ in 0..5 {
let item = try_ready!(self.stream.poll());
if item.is_none() { return Ok(Ready(()); }
self.handle_item(item);
}
return Ok(NotReady());
}
In this example, you can end in a situation where your future is not polled anymore. This will happen, when you pull out 5 items in a row, but the Stream
has still more items to be pulled. But you are not telling the Tokio
runtime that your future should be called again and thus, you will not get called again.
I know that is really complicated and I already suffered several times from random bugs (futures where not called anymore etc), because of all these implementation details you need to know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also looked again over your code and when you enter the state where you create a timeout, you will just loop forever.
You need to leave the loop, after you created your timeout and you also need to poll your timeout at least once, that it is registered in the runtime and your futures gets called when the timeout is reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and you also need to poll your timeout at least once
yeah i completely forgot about it, @tomusdrw asked to remove the duplicate polling logic and it skipped my mind to wrap the whole thing inside the loop
thanks a lot for the catch 🙏🏾
@bkchr I'm very familiar with how streams and futures work, and I'm pretty sure calling And i should mention, I've tested this code. It works as advertised. |
Sorry, did not wanted to sound rude. However, when you are not able to accept any new connections and you can not register any timer at the runtime, you end in a hot loop that will probably never end. All futures from the same thread will just starve, as you are looping in |
} | ||
Err(err) => { | ||
warn!("Timeout error {:?}", err); | ||
task::current().notify(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to chime in so late, but this doesn't make any sense at all? Why notify the task instead of just jumping to self.stream.poll()
? There is nothing else that may happen when you notify
the task and return NotRead
right away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have self.timeout.is_some() == true
, we polled the Stream
and it did return an error, that does indicate that we can not accept any more connections at the moment. So, we created a timeout and we are at polling this timeout and the timeout returns an error. This error is very unlikely, but could happen if we already have a lot of other timers registered in the Tokio runtime.
So, we can not poll self.stream
as we can not accept any new connections and polling the timeout
also returned an error.
However, we want that other futures can continue and maybe finish. For that, we inform the current task, that our future should be polled in the next cycle again and return NotReady
.
Now, Tokio will continue with all the other futures that are currently ready and need to be processed and in the next "cycle" our future is called again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the explanation. Although self.timeout.is_some()
doesn't imply that stream
was polled in the same cycle, but indeed the behavior here is correct and handles all the cases, my proposed change would probably have this subtle bug, when both stream and timeout errors and we get into this infinite loop of erroring.
warn!("Timeout error {:?}", err); | ||
task::current().notify(); | ||
return Ok(Async::NotReady); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should break out of the loop in case timeout = None
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do that, we would never call self.stream.poll()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, apologies. I misread the code and though that only self.timeout
is inside the loop
* closes #322 * refactored error handling * proper loop handling * schedule SuspendableStream on timer error
* closes #322 * refactored error handling * proper loop handling * schedule SuspendableStream on timer error
* closes #322 * refactored error handling * proper loop handling * schedule SuspendableStream on timer error
* closes paritytech#322 * refactored error handling * proper loop handling * schedule SuspendableStream on timer error
* closes paritytech#322 * refactored error handling * proper loop handling * schedule SuspendableStream on timer error
No description provided.