Collators get incoming parachain messages #149
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the changes lgtm but need to re-review more in-depth.
collator/src/lib.rs
Outdated
).map_err(Error::Collator)?; | ||
|
||
let block_data_hash = block_data.hash(); | ||
let signature = key.sign(block_data_hash.as_ref()).into(); | ||
let egress_queue_roots | ||
= ::polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move the =
to the previous line?
network/src/router.rs
Outdated
|
||
// this is the ingress from source to target, with given messages. | ||
let target_incoming = incoming_message_topic(self.parent_hash, target); | ||
let target_incoming | ||
= validation::incoming_message_topic(self.parent_hash(), target); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
=
on previous line?
network/src/router.rs
Outdated
/// with `import_statement`. | ||
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement,Error=()> { | ||
// spin up a task in the background that processes all incoming statements | ||
// TODO: propagate statements on a timer? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's track this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's an old TODO, just has been moved here.
let canon_root = occupied.get().clone(); | ||
let messages = messages.iter().map(|m| &m.0[..]); | ||
if ::polkadot_validation::message_queue_root(messages) != canon_root { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this code is just being moved and it's unchanged but can you explain the reasoning behind this? If the roots never match for a given parachain it seems we'll just drop all ingress data eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We know what the roots are supposed to be, but this is a struct that's filtering out messages from gossip that are claiming that this is the message packet from another chain. If the messages in that packet don't have the correct root, then we want to ignore that. If they do, then we import and stop listening for that parachain's outgoing messages. Ideally, we would punish the peers circulating bad messages on that topic, but it is gossip so it's hard to tell who the originator was. In the future we will try to avoid gossiping around bad message packets, although it's a bit racy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments...
/// with `import_statement`. | ||
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement,Error=()> { | ||
// spin up a task in the background that processes all incoming statements | ||
// validation has been done already by the gossip validator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the "spin up a task in the background" here is incorrect, since gossip_messages_for
will block waiting on the mpsc::UnboundedReceiver
, and then the filter_map
and map
will happen in the current task. So it would more accurate to write that:
- This entire operation will block the current thread.
filter_map
andmap
will happen in the current task.- What will happen "in the background" is fetching the gossip messages.
I remember you mentioned a potential solution to this in a chat, which we'll have to look into in a different PR. I think for now the comment can be amended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the solution is to pass a oneshot::Sender
to gossip_messages_for
network/src/validation.rs
Outdated
Async::Ready(mut inner) => { | ||
let poll_result = inner.poll(); | ||
self.inner = Some(inner); | ||
poll_result.map_err(map_err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember you once pointed that it would be more robust to do
// Don't first do the inner.poll()
self.inner = Some(inner);
self.poll()
network/src/validation.rs
Outdated
|
||
{ | ||
let mut incoming_fetched = self.fetch_incoming.lock(); | ||
for (para_id, _) in incoming_fetched.drain() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe close
each receiver as well? Not sure if other tasks could still be receiving messages on those while this is ongoing(and until the drop_gossip
message is handled by the protocol thread).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'll do here is have an exit_future::Signal
in the SessionDataFetcher
and have the fetching futures select
on that as well as the global exit
handler. Then when the SessionDataFetcher
is dropped, all the futures will be as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A question and proposal on whether we can remove the locks in SessionDataFetcher
. Maybe something better done in a follow-up. Otherwise looks good.
knowledge: Arc<Mutex<Knowledge>>, | ||
parent_hash: Hash, | ||
message_validator: RegisteredMessageValidator, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so now that I understand the structure a bit better, I have a proposal for a larger restructuring, which perhaps could be done in a separate PR, or here:
I'd like to see if we can remove the two Arc<Mutex
above, because they potentially introduce blocking in the system(can we know for sure that two tasks will never attempt a note_statement
or fetch_incoming
at the same time and end-up having to wait on each other, in the meantime blocking other tasks?).
- Could we make
SessionDataFetcher
aStream
, and only share a sender(probably wrapped in a struct with methods likefetch_incoming
) with theRouter
? - Then something like
fetch_incoming
could be implemented by having an intermediary future with an "inner/outer" receiver like we have elsewhere, and sending a message containing a sender to theSessionDataFetcher
stream, which would then do the work in it's own task and be able to mutatefetch_incoming
without a lock, finally responding by sending a receiver via the sender it received in the original message. - Something like
note_statement
could also be done by sending a message toSessionDataFetcher
, which would do then doself.knowledge.note_statement
without having to lockknowledge
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inner/outer futures are terrible so I'd prefer to avoid that wherever possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the operations are order-dependent. e.g. note_statement
should be done before repropagation. What is the item of SessionDataFetcher
anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the item can be ()
, the main idea is that the SessionDataFetcher
would run in it's own independent task and internally poll channels whose senders have been shared with other tasks, and then "finish" once those senders have been dropped.
For order dependent operations, you could either do some bookkeeping inside SessionDataFetcher
(if the order would require cross-task coordination), or if the "order" can be ensured by doing the operations one after the other from one task, you could just send one message after the other from one task and rely on the fact that they will be received in that order.
I don't think the inner/outer pattern is great, and it does seem like a necessary piece of glue because the other task expect to immediately get a receiver to start polling. It's something that could be addressed if we have a better idea. The good thing about inner/outer is that it cannot block, unlike the use of a lock.
If you look at fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver
, you can imagine it running like such:
- The router calls
self.fetcher.fetch_incoming
, which is supposed to immediately return a receiver. fetch_incoming
will attempt to acquire the lock aroundself.fetch_incoming.lock()
.- At that point the thread, not the task, could block waiting for the lock to be available. If this happens it will block every other task tied to the same thread on the pool(
parking_lot
is optimized for threads, not tasks, and will happily park the thread if the lock doesn't become available "quickly"). - When the lock becomes available, the rest of
fetch_incoming
executes and returns aIncomingReceiver
. - So it looks like we have non-blocking code with the
IncomingReceiver
immediately being available to be included in a chain of work on the thread pool, but actually we have a potential "stop the world" moment right in the middle of the operation.
Obviously, this can be seen as just an optimization problem, and the lock might actually never be contented, or it can be seen as a code design problem, to be solved by ensuring the code can never block. The ugliness of the inner/outer pattern that might be required to plug this in the system, is actually a result of the system having unrealistic expectations(that a IncomingReceiver
can always be returned immediately from a call to fetch_incoming
).
I don't think it's a huge problem, and perhaps it's best addressed in a follow-up PR. I do think in general it would be good to move away from using parking_lot
in the context of tasks. If locks really is the right pattern for a given solution, it would be better to see if we could use a lock that is futures aware like https://docs.rs/futures/0.1.25/futures/sync/struct.BiLock.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened #180 to further discuss this...
* Add skeleton for worst case import_unsigned_header * Fix a typo * Add benchmark test for best case unsigned header import * Add finality verification to worst case bench * Move `insert_header()` from mock to test_utils Allows the benchmarking code to use this without having to pull it in from the mock. * Add a rough bench to test a finalizing a "long" chain * Try to use complexity parameter for finality bench * Improve long finality bench * Remove stray dot file * Remove old "worst" case bench * Scribble some ideas down for pruning bench * Prune headers during benchmarking * Clean up some comments * Make finality bench work for entire range of complexity parameter * Place initialization code into a function * Add bench for block finalization with caching * First attempt at bench with receipts * Try and trigger validator set change * Perform a validator set change during benchmarking * Move `validators_change_receipt()` to shared location Allows unit tests and benchmarks to access the same helper function and const * Extract a test receipt root into a constant * Clean up description of pruning bench * Fix cache and pruning tests * Remove unecessary `build_custom_header` usage * Get rid of warnings * Remove code duplication comment I don't think its entirely worth it to split out so few lines of code. The benches aren't particularly hard to read anyways. * Increase the range of the complexity parameter * Use dynamic number of receipts while benchmarking As part of this change we have removed the hardcoded TEST_RECEIPT_ROOT and instead chose to calculate the receipt root on the fly. This will make tests and benches less fragile. * Prune a dynamic number of headers
In #117, the
polkadot-consensus
(nowpolkadot-validation
) traits were extended to support message-passing. The Polkadot network implementation got some utilities for broadcasting and fetching ingress messages to a parachain, but these utilities were only available to the validator's pipeline.This PR extends and generalizes a bit further so any part of the codebase can fetch ingress messages, and extends
polkadot-collator
to use that and to produce outgoing messages.