Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gossipsub: remove fast_message_id_fn #4138

Closed
mxinden opened this issue Jul 3, 2023 · 3 comments · Fixed by #4285
Closed

gossipsub: remove fast_message_id_fn #4138

mxinden opened this issue Jul 3, 2023 · 3 comments · Fixed by #4285
Assignees
Labels
difficulty:moderate getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted

Comments

@mxinden
Copy link
Member

mxinden commented Jul 3, 2023

Description

Today libp2p-gossipsub provides the fast_message_id_fn mechanism:

/// A user-defined optional function that computes fast ids from raw messages. This can be used
/// to avoid possibly expensive transformations from [`RawMessage`] to
/// [`Message`] for duplicates. Two semantically different messages must always
/// have different fast message ids, but it is allowed that two semantically identical messages
/// have different fast message ids as long as the message_id_fn produces the same id for them.
///
/// The function takes a [`Message`] as input and outputs a String to be interpreted
/// as the fast message id. Default is None.
pub fn fast_message_id_fn<F>(&mut self, fast_id_fn: F) -> &mut Self
where
F: Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static,
{
self.config.fast_message_id_fn = Some(Arc::new(fast_id_fn));
self
}

Past motivation for the mechanism is the following . Ethereum compresses GossipSub messages. To avoid decompression just to find out that a message is a duplicate, introduce an additional message id method that can be applied pre-decompression.

Though the motivator itself, i.e. to avoid de-compression is no longer relevant. To quote @AgeManning:

However, the fast-message-id thing probably doesn't really save us all that much, because our compression algo (snappy) is pretty quick and we're talking very small amount of processing.

One might think that removing the fast_message_id_fn mechanism makes libp2p-gossipsub prone to DOS attacks where the attacker tries to exhaust the CPU through forcing lots of decompression. Though even with the mechanism, an attacker can send slightly altered messages, thus forcing the local machine to do the decompression work.

Next to the above no longer relevant optimization, the mechanism adds significant amount of complexity.

  • One needs a translation mechanism.
    /// Short term cache for fast message ids mapping them to the real message ids
    fast_message_id_cache: TimeCache<FastMessageId, MessageId>,
  • It bloats implementations of e.g. handle_received_message.
    /// Handles a newly received [`RawMessage`].
    ///
    /// Forwards the message to all peers in the mesh.
    fn handle_received_message(
    &mut self,
    mut raw_message: RawMessage,
    propagation_source: &PeerId,
    ) {
    // Record the received metric
    if let Some(metrics) = self.metrics.as_mut() {
    metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
    }
    let fast_message_id = self.config.fast_message_id(&raw_message);
    if let Some(fast_message_id) = fast_message_id.as_ref() {
    if let Some(msg_id) = self.fast_message_id_cache.get(fast_message_id) {
    let msg_id = msg_id.clone();
    // Report the duplicate
    if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
    if let Some((peer_score, ..)) = &mut self.peer_score {
    peer_score.duplicated_message(
    propagation_source,
    &msg_id,
    &raw_message.topic,
    );
    }
    // Update the cache, informing that we have received a duplicate from another peer.
    // The peers in this cache are used to prevent us forwarding redundant messages onto
    // these peers.
    self.mcache.observe_duplicate(&msg_id, propagation_source);
    }
    // This message has been seen previously. Ignore it
    return;
    }
    }
    // Try and perform the data transform to the message. If it fails, consider it invalid.
    let message = match self.data_transform.inbound_transform(raw_message.clone()) {
    Ok(message) => message,
    Err(e) => {
    debug!("Invalid message. Transform error: {:?}", e);
    // Reject the message and return
    self.handle_invalid_message(
    propagation_source,
    &raw_message,
    RejectReason::ValidationError(ValidationError::TransformFailed),
    );
    return;
    }
    };
    // Calculate the message id on the transformed data.
    let msg_id = self.config.message_id(&message);
    // Check the validity of the message
    // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
    // and instead continually penalize peers that repeatedly send this message.
    if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
    return;
    }
    // Add the message to the duplicate caches
    if let Some(fast_message_id) = fast_message_id {
    // add id to cache
    self.fast_message_id_cache
    .entry(fast_message_id)
    .or_insert_with(|| msg_id.clone());
    }
    if !self.duplicate_cache.insert(msg_id.clone()) {
    debug!("Message already received, ignoring. Message: {}", msg_id);
    if let Some((peer_score, ..)) = &mut self.peer_score {
    peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
    }
    self.mcache.observe_duplicate(&msg_id, propagation_source);
    return;
    }
    debug!(
    "Put message {:?} in duplicate_cache and resolve promises",
    msg_id
    );
    // Record the received message with the metrics
    if let Some(metrics) = self.metrics.as_mut() {
    metrics.msg_recvd(&message.topic);
    }
    // Tells score that message arrived (but is maybe not fully validated yet).
    // Consider the message as delivered for gossip promises.
    if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
    peer_score.validate_message(propagation_source, &msg_id, &message.topic);
    gossip_promises.message_delivered(&msg_id);
    }
    // Add the message to our memcache
    self.mcache.put(&msg_id, raw_message.clone());
    // Dispatch the message to the user if we are subscribed to any of the topics
    if self.mesh.contains_key(&message.topic) {
    debug!("Sending received message to user");
    self.events
    .push_back(ToSwarm::GenerateEvent(Event::Message {
    propagation_source: *propagation_source,
    message_id: msg_id.clone(),
    message,
    }));
    } else {
    debug!(
    "Received message on a topic we are not subscribed to: {:?}",
    message.topic
    );
    return;
    }
    // forward the message to mesh peers, if no validation is required
    if !self.config.validate_messages() {
    if self
    .forward_msg(
    &msg_id,
    raw_message,
    Some(propagation_source),
    HashSet::new(),
    )
    .is_err()
    {
    error!("Failed to forward message. Too large");
    }
    debug!("Completed message handling for message: {:?}", msg_id);
    }
    }
  • It makes reasoning about attack scenarios difficult https://github.com/libp2p/rust-libp2p/security/advisories/GHSA-gv2v-chrf-46p3

Motivation

Simplify libp2p-gossipsub.

@AgeManning extracted from our past discussion. Please comment in case I am missing something.

Current Implementation

Are you planning to do it yourself in a pull request?

No, but I am happy to mentor someone.

@mxinden mxinden added difficulty:moderate help wanted getting-started Issues that can be tackled if you don't know the internals of libp2p very well labels Jul 3, 2023
@StemCll
Copy link
Contributor

StemCll commented Jul 23, 2023

Hi @mxinden 👋
I'm starting to work on this task :)

@mxinden
Copy link
Member Author

mxinden commented Jul 24, 2023

Great thank you. Feel free to open a draft pull request in case you want early feedback @StemCll.

@StemCll
Copy link
Contributor

StemCll commented Jul 24, 2023

Sure, I need to get into the implementation first

@mergify mergify bot closed this as completed in #4285 Oct 20, 2023
mergify bot pushed a commit that referenced this issue Oct 20, 2023
xgreenx added a commit to FuelLabs/fuel-core that referenced this issue Dec 22, 2023
#1298

### Documentation:
Removed `upgrade::read_length_prefixed` and
`upgrade::write_length_prefixed`:
https://github.com/libp2p/rust-libp2p/pull/4787/files
Remove `FastMessageId`:
libp2p/rust-libp2p#4138
Remove `TokioDnsConfig`:
libp2p/rust-libp2p@95890b5
Implement `InboundConnectionUpgrade`/`OutboundConnectionUpgrade`:
libp2p/rust-libp2p#4307

---------

Co-authored-by: Brandon Vrooman <brandon.vrooman@gmail.com>
Co-authored-by: Hannes Karppila <hannes.karppila@gmail.com>
Co-authored-by: xgreenx <xgreenx9999@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
difficulty:moderate getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants