refactor(webrtc): rework Substream <-> SubstreamHandle communication#600
Conversation
Rework the communication mechanism between `Substream` and its `SubstreamHandle`. State is shared through `Mutex` and `AtomicWaker` abstracted behind a small helper, which makes them easy to use and ensures the relevant tasks are woken whenever the shared state changes. This also decouples the reading half from the writing half: a graceful close of either half no longer implies closing the other. An abrupt RESET_STREAM still tears down both, as required by the spec.
| } | ||
|
|
||
| fn get(&self) -> T { | ||
| self.state.lock().clone() |
There was a problem hiding this comment.
nit: Could we get into TOCTOU issues? We lock here, clone, on drop the lock goes away, we use the cloned state, and by the time we read the clone, the actual state behind mutex changes?
| self.handles.remove(&channel_id); | ||
| } | ||
| } | ||
| Some((_, Some(SubstreamEvent::RecvClosed))) => {} |
| // could cause this method to wait and thus stall the entire webrtc | ||
| // connection. Solution would be to implement reading | ||
| // backpressure, keeping track of pending incoming messages. | ||
| let _ = message_tx |
There was a problem hiding this comment.
nit: Lets add a debug log here on errors
| /// TX channel for sending inbound messages from `peer` to the associated `Substream`. | ||
| inbound_tx: Sender<Event>, | ||
|
|
||
| message_tx: Option<Sender<Message>>, |
There was a problem hiding this comment.
I would extend the comment to document that we are dropping the channel upon receiving the Fin packet, after which no more messages are sent.
| && self | ||
| .inbound_tx | ||
| .send(Event::Message { | ||
| match (self.message_tx.as_ref(), message.payload) { |
There was a problem hiding this comment.
message_tx is dropped upon receiving the Fin packet.
Practically, we could still receive a message payload here even tho we received the fin and dropped the tx channel.
IIRC, spec states that no further messages are received, maybe we should emit an warning (that will become debug once we stabilize)?
There was a problem hiding this comment.
Yes, this makes sense!! str0m should guarantee the order of things and thus after FIN no other message should arrive, beside just flags!
| // Wake up any task waiting on shutdown | ||
| self.substream_shutdown_waker.wake(); | ||
| self.shutdown_waker.wake(); | ||
| if matches!(self.writer_state.get(), WriterState::Fin) { |
There was a problem hiding this comment.
nit: Let's cache the writer_state in a variable, by the time we read it again under the mutex it might have already mutated so the debug log will make investigations a bit more difficult
| target: LOG_TARGET, | ||
| ?state, | ||
| state = ?self.writer_state.get(), | ||
| "received FIN_ACK in unexpected state, ignoring" |
There was a problem hiding this comment.
We expect the state to be exactly Fin to transition to FinAck. How could we make this a bit more robust here just in case we introduced a bug somewhere or the other peer is misbehaving?
- maybe we could clean up the states regardless or send a reset or return ConnectionClosed
There was a problem hiding this comment.
We should reset the connection if we received FIN_ACK from an unexpected state, this seems to make perfectly sense! Within spec FIN_ACK should be received only in one scenario so I think it is right to expect it only when we have a WriterState::FIN
| // Stream thus FinAck will only be sent once something else awake it. | ||
| *self.reading_state.lock() = ReadingState::Fin; | ||
| self.reader_state.set(ReaderState::Fin); | ||
| let _ = self.message_tx.take(); |
There was a problem hiding this comment.
nit:
| let _ = self.message_tx.take(); | |
| if self.message_tx.take().is_none() { | |
| warn!("Unexpected to have channel already dropped / similar") | |
| } |
| } | ||
| // This function carries forward the writer half close process. | ||
| // | ||
| // It is expected to: |
There was a problem hiding this comment.
nit or similar: The following behaviors are expected on:
-
WriterState::Openstate:- etc
-
WriteState::Finstate:- etc
| let mut timeout = Box::pin(tokio::time::sleep(FIN_ACK_TIMEOUT)); | ||
| // Poll the timeout once to register it with tokio's timer | ||
| // This ensures we'll be woken when it expires | ||
| let _ = timeout.as_mut().poll(cx); |
There was a problem hiding this comment.
if timer.poll().is_ready() {
error! ("Misconfigured timer is not supposed to be ready")
}
| // incoming messages, there are 2 side effects which connects the two streams: | ||
| // 1. If FIN arrived then FIN_ACK is expected to be sent back. | ||
| // 2. If FIN_ACK arrived the writer_state is updated. | ||
| { |
There was a problem hiding this comment.
I think we can safely drop the { } block, we are hiding the locks under set / register_and_get
There was a problem hiding this comment.
Yup, that's a vestigial from when a lock was acquired and dropped at the end of the sub-scope! 👍
| .send(Event::Message { | ||
| match (self.message_tx.as_ref(), message.payload) { | ||
| (Some(message_tx), Some(payload)) if !payload.is_empty() => { | ||
| // TODO: awaiting here makes the entire connection |
There was a problem hiding this comment.
Lets create an issue for this one if not already and place it in the comment here
| } | ||
|
|
||
| struct Inner<T> { | ||
| state: Mutex<T>, |
There was a problem hiding this comment.
I would take this oen step further and replace the Mutex with an AtomicU8
Accidently, this is what I've proposed in the past to the original shutdown impl from #513 (comment).
Since we follow the behavior:
- set: mutex.lock (ie
.store(Release)) - get: mutex.lock (ie
.load(Acquire))
It should be straight forward (and cause no side-effects) to replace it with a lock free atomic:
enum WriterState {
/// The writing stream is open.
Open = 0,
/// A Fin flag was sent.
Fin = 1,
/// FinAck was received.
FinAck = 2,
/// StopSending was received.
StopSending = 3,
}
Then we could also close: #523
There was a problem hiding this comment.
That's a great idea!! And given the current SharedState abstraction is seems pretty straight forward to implement!!!
|
Nice job here @gab8i! This simplifies the state machines quite a lot! While at it, |
This PR attempt the refactor of the
substreammodule explained here: #593 (comment)Contrary to what's described in the issue, I ended up using the Mutex + AtomicWaker pattern, because it turned out to be the cleanest and most effective option.
One thing worth knowing: there are roughly 3 states, with multiple tasks modifying and reading them, so there can be multiple producers and consumers. Every attempt to use a data structure that handles the polling internally, so that no task is left behind, stuck waiting on some other parallel event, ended up making the code messy: it added many fields and, most importantly, required remembering to notify/update a watcher (or push to several different places) at every relevant point in the codebase.
What the PR does is:
Rework the communication mechanism between
Substreamand itsSubstreamHandle. State is shared throughMutexandAtomicWakerabstracted behind a small helper, which makes them easy to use and ensures the relevant tasks are woken whenever the shared state changes.This also decouples the reading half from the writing half: a graceful close of either half no longer implies closing the other. An abrupt RESET_STREAM still tears down both, as required by the spec.