Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber terminating crashes publisher with SHM #220

Open
ciandonovan opened this issue Jun 27, 2024 · 6 comments
Open

Subscriber terminating crashes publisher with SHM #220

ciandonovan opened this issue Jun 27, 2024 · 6 comments

Comments

@ciandonovan
Copy link

When using shared-memory between a publisher and subscriber, when the subscriber terminates either gracefully or not, the publisher crashes with:

[node-1] terminate called after throwing an instance of 'rclcpp::exceptions::RCLError'
[node-1]   what():  failed to publish message: Failed to allocate a SHM buffer, even after GCing, at /opt/ros/dev_ws/src/rmw_zenoh/rmw_zenoh_cpp/src/rmw_zenoh.cpp:949, at ./src/rcl/publisher.c:284

Tested with commit e73df29 of rmw_zenoh with the Jazzy distro.

@clalancette
Copy link
Collaborator

Can you please give a configuration/setup where we could enable shared memory? At present I'm not sure how to do this.

@ciandonovan
Copy link
Author

DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5

/// This file attempts to list and document available configuration elements.
/// For a more complete view of the configuration's structure, check out `zenoh/src/config.rs`'s `Config` structure.
/// Note that the values here are correctly typed, but may not be sensible, so copying this file to change only the parts that matter to you is not good practice.
{
  /// The identifier (as unsigned 128bit integer in hexadecimal lowercase - leading zeros are not accepted)
  /// that zenoh runtime will use.
  /// If not set, a random unsigned 128bit integer will be used.
  /// WARNING: this id must be unique in your zenoh network.
  // id: "1234567890abcdef",

  /// The node's mode (router, peer or client)
  mode: "router",

  /// Which endpoints to connect to. E.g. tcp/localhost:7447.
  /// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup.
  connect: {
    endpoints: [
      // "<proto>/<address>"
    ],
  },

  /// Which endpoints to listen on. E.g. tcp/localhost:7447.
  /// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers,
  /// peers, or client can use to establish a zenoh session.
  listen: {
    endpoints: [
      "tcp/[::]:7447"
    ],
  },
  /// Configure the scouting mechanisms and their behaviours
  scouting: {
    /// In client mode, the period dedicated to scouting for a router before failing
    timeout: 3000,
    /// In peer mode, the period dedicated to scouting remote peers before attempting other operations
    delay: 1,
    /// The multicast scouting configuration.
    multicast: {
      /// Whether multicast scouting is enabled or not
      enabled: false,
      /// The socket which should be used for multicast scouting
      address: "224.0.0.224:7446",
      /// The network interface which should be used for multicast scouting
      interface: "auto", // If not set or set to "auto" the interface if picked automatically
      /// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast.
      /// Accepts a single value or different values for router, peer and client.
      /// Each value is bit-or-like combinations of "peer", "router" and "client".
      autoconnect: { router: "", peer: "router|peer" },
      /// Whether or not to listen for scout messages on UDP multicast and reply to them.
      listen: true,
    },
    /// The gossip scouting configuration.
    gossip: {
      /// Whether gossip scouting is enabled or not
      enabled: true,
      /// When true, gossip scouting informations are propagated multiple hops to all nodes in the local network.
      /// When false, gossip scouting informations are only propagated to the next hop.
      /// Activating multihop gossip implies more scouting traffic and a lower scalability.
      /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have
      /// direct connectivity with each other.
      multihop: false,
      /// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip.
      /// Accepts a single value or different values for router, peer and client.
      /// Each value is bit-or-like combinations of "peer", "router" and "client".
      autoconnect: { router: "", peer: "router|peer" },
    },
  },

  /// Configuration of data messages timestamps management.
  timestamping: {
    /// Whether data messages should be timestamped if not already.
    /// Accepts a single boolean value or different values for router, peer and client.
    /// PublicationCache which is required for transient_local durability
    /// only works when time-stamping is enabled.
    enabled: { router: true, peer: true, client: false },
    /// Whether data messages with timestamps in the future should be dropped or not.
    /// If set to false (default), messages with timestamps in the future are retimestamped.
    /// Timestamps are ignored if timestamping is disabled.
    drop_future_timestamp: false,
  },

  /// The default timeout to apply to queries in milliseconds.
  queries_default_timeout: 10000,

  /// The routing strategy to use and it's configuration.
  routing: {
    /// The routing strategy to use in routers and it's configuration.
    router: {
      /// When set to true a router will forward data between two peers
      /// directly connected to it if it detects that those peers are not
      /// connected to each other.
      /// The failover brokering only works if gossip discovery is enabled.
      peers_failover_brokering: true,
    },
    /// The routing strategy to use in peers and it's configuration.
    peer: {
      /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
      mode: "peer_to_peer",
    },
  },

  //  /// The declarations aggregation strategy.
  //  aggregation: {
  //      /// A list of key-expressions for which all included subscribers will be aggregated into.
  //      subscribers: [
  //        // key_expression
  //      ],
  //      /// A list of key-expressions for which all included publishers will be aggregated into.
  //      publishers: [
  //        // key_expression
  //      ],
  //  },

 /// Configure internal transport parameters
  transport: {
    unicast: {
      /// Timeout in milliseconds when opening a link
      accept_timeout: 10000,
      /// Maximum number of zenoh session in pending state while accepting
      accept_pending: 100,
      /// Maximum number of sessions that can be simultaneously alive
      max_sessions: 1000,
      /// Maximum number of incoming links that are admitted per session
      max_links: 1,
      /// Enables the LowLatency transport
      /// This option does not make LowLatency transport mandatory, the actual implementation of transport
      /// used will depend on Establish procedure and other party's settings
      ///
      /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization.
      /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to
      ///       enable 'lowlatency' you need to explicitly disable 'qos'.
      lowlatency: false,
      /// Enables QoS on unicast communications.
      qos: {
        enabled: true,
      },
      /// Enables compression on unicast communications.
      /// Compression capabilities are negotiated during session establishment.
      /// If both Zenoh nodes support compression, then compression is activated.
      compression: {
        enabled: false,
      },
    },
    link: {
      /// An optional whitelist of protocols to be used for accepting and opening sessions.
      /// If not configured, all the supported protocols are automatically whitelisted.
      /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream"]
      /// For example, to only enable "tls" and "quic":
      //   protocols: ["tls", "quic"],
      /// Configure the zenoh TX parameters of a link
      tx: {
        /// The resolution in bits to be used for the message sequence numbers.
        /// When establishing a session with another Zenoh instance, the lowest value of the two instances will be used.
        /// Accepted values: 8bit, 16bit, 32bit, 64bit.
        sequence_number_resolution: "32bit",
        /// Link lease duration in milliseconds to announce to other zenoh nodes
        lease: 10000,
        /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive
        /// messages will be sent at the configured time interval.
        /// NOTE: In order to consider eventual packet loss and transmission latency and jitter,
        ///       set the actual keep_alive timeout to one fourth of the lease time.
        ///       This is in-line with the ITU-T G.8013/Y.1731 specification on continous connectivity
        ///       check which considers a link as failed when no messages are received in 3.5 times the
        ///       target interval.
        keep_alive: 4,
        /// Batch size in bytes is expressed as a 16bit unsigned integer.
        /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535).
        /// The default batch size value is the maximum batch size: 65535.
        batch_size: 65535,
        /// Each zenoh link has a transmission queue that can be configured
        queue: {
          /// The size of each priority queue indicates the number of batches a given queue can contain.
          /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE.
          /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE,
          /// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU.
          /// If qos is false, then only the DATA priority will be allocated.
          size: {
            control: 1,
            real_time: 1,
            interactive_high: 1,
            interactive_low: 1,
            data_high: 2,
            data: 4,
            data_low: 4,
            background: 4,
          },
          /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
          /// Higher values lead to a more aggressive batching but it will introduce additional latency.
          backoff: 100,
        },
        // Number of threads dedicated to transmission
        // By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4)
        // threads: 1,
      },
      /// Configure the zenoh RX parameters of a link
      rx: {
        /// Receiving buffer size in bytes for each link
        /// The default the rx_buffer_size value is the same as the default batch size: 65335.
        /// For very high throughput scenarios, the rx_buffer_size can be increased to accomodate
        /// more in-flight data. This is particularly relevant when dealing with large messages.
        /// E.g. for 16MiB rx_buffer_size set the value to: 16777216.
        buffer_size: 65535,
        /// Maximum size of the defragmentation buffer at receiver end.
        /// Fragmented messages that are larger than the configured size will be dropped.
        /// The default value is 1GiB. This would work in most scenarios.
        /// NOTE: reduce the value if you are operating on a memory constrained device.
        max_message_size: 1073741824,
      },
      /// Configure TLS specific parameters
      tls: {
        /// Path to the certificate of the certificate authority used to validate either the server
        /// or the client's keys and certificates, depending on the node's mode. If not specified
        /// on router mode then the default WebPKI certificates are used instead.
        root_ca_certificate: null,
        /// Path to the TLS server private key
        server_private_key: null,
        /// Path to the TLS server public certificate
        server_certificate: null,
        /// Client authentication, if true enables mTLS (mutual authentication)
        client_auth: false,
        /// Path to the TLS client private key
        client_private_key: null,
        /// Path to the TLS client public certificate
        client_certificate: null,
        // Whether or not to use server name verification, if set to false zenoh will disregard the common names of the certificates when verifying servers.
        // This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com. If you wan't your
        // ca to verify that the server at baz.com is actually baz.com, let this be true (default).
        server_name_verification: null,
      },
    },
    /// Shared memory configuration
    shared_memory: {
      enabled: true,
    },
    /// Access control configuration
    auth: {
      /// The configuration of authentification.
      /// A password implies a username is required.
      usrpwd: {
        user: null,
        password: null,
        /// The path to a file containing the user password dictionary
        dictionary_file: null,
      },
      pubkey: {
        public_key_pem: null,
        private_key_pem: null,
        public_key_file: null,
        private_key_file: null,
        key_size: null,
        known_keys_file: null,
      },
    },
  },

  /// Configure the Admin Space
  /// Unstable: this configuration part works as advertised, but may change in a future release
  adminspace: {
    // read and/or write permissions on the admin space
    permissions: {
      read: true,
      write: false,
    },
  },

}

DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5

/// This file attempts to list and document available configuration elements.
/// For a more complete view of the configuration's structure, check out `zenoh/src/config.rs`'s `Config` structure.
/// Note that the values here are correctly typed, but may not be sensible, so copying this file to change only the parts that matter to you is not good practice.
{
  /// The identifier (as unsigned 128bit integer in hexadecimal lowercase - leading zeros are not accepted)
  /// that zenoh runtime will use.
  /// If not set, a random unsigned 128bit integer will be used.
  /// WARNING: this id must be unique in your zenoh network.
  // id: "1234567890abcdef",

  /// The node's mode (router, peer or client)
  mode: "peer",

  /// Which endpoints to connect to. E.g. tcp/localhost:7447.
  /// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup.
  /// ROS setting: By default connect to the Zenoh router on localhost on port 7447.
  connect: {
    endpoints: [
      "tcp/localhost:7447",
    ],
  },

  /// Which endpoints to listen on. E.g. tcp/localhost:7447.
  /// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers,
  /// peers, or client can use to establish a zenoh session.
  /// ROS setting: By default accept incoming connections only from localhost (i.e. from colocalized Nodes).
  ///              All communications with other hosts are routed by the Zenoh router.
  listen: {
    endpoints: [
      "tcp/localhost:0",
    ],
  },
  /// Configure the scouting mechanisms and their behaviours
  scouting: {
    /// In client mode, the period dedicated to scouting for a router before failing
    timeout: 3000,
    /// In peer mode, the period dedicated to scouting remote peers before attempting other operations
    delay: 1,
    /// The multicast scouting configuration.
    multicast: {
      /// Whether multicast scouting is enabled or not
      enabled: false,
      /// The socket which should be used for multicast scouting
      address: "224.0.0.224:7446",
      /// The network interface which should be used for multicast scouting
      interface: "auto", // If not set or set to "auto" the interface if picked automatically
      /// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast.
      /// Accepts a single value or different values for router, peer and client.
      /// Each value is bit-or-like combinations of "peer", "router" and "client".
      autoconnect: { router: "", peer: "router|peer" },
      /// Whether or not to listen for scout messages on UDP multicast and reply to them.
      listen: true,
    },
    /// The gossip scouting configuration.
    gossip: {
      /// Whether gossip scouting is enabled or not
      enabled: true,
      /// When true, gossip scouting informations are propagated multiple hops to all nodes in the local network.
      /// When false, gossip scouting informations are only propagated to the next hop.
      /// Activating multihop gossip implies more scouting traffic and a lower scalability.
      /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have
      /// direct connectivity with each other.
      multihop: false,
      /// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip.
      /// Accepts a single value or different values for router, peer and client.
      /// Each value is bit-or-like combinations of "peer", "router" and "client".
      autoconnect: { router: "", peer: "router|peer" },
    },
  },

  /// Configuration of data messages timestamps management.
  timestamping: {
    /// Whether data messages should be timestamped if not already.
    /// Accepts a single boolean value or different values for router, peer and client.
    /// PublicationCache which is required for transient_local durability
    /// only works when time-stamping is enabled.
    enabled: { router: true, peer: true, client: false },
    /// Whether data messages with timestamps in the future should be dropped or not.
    /// If set to false (default), messages with timestamps in the future are retimestamped.
    /// Timestamps are ignored if timestamping is disabled.
    drop_future_timestamp: false,
  },

  /// The default timeout to apply to queries in milliseconds.
  queries_default_timeout: 10000,

  /// The routing strategy to use and it's configuration.
  routing: {
    /// The routing strategy to use in routers and it's configuration.
    router: {
      /// When set to true a router will forward data between two peers
      /// directly connected to it if it detects that those peers are not
      /// connected to each other.
      /// The failover brokering only works if gossip discovery is enabled.
      peers_failover_brokering: true,
    },
    /// The routing strategy to use in peers and it's configuration.
    peer: {
      /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
      mode: "peer_to_peer",
    },
  },

  //  /// The declarations aggregation strategy.
  //  aggregation: {
  //      /// A list of key-expressions for which all included subscribers will be aggregated into.
  //      subscribers: [
  //        // key_expression
  //      ],
  //      /// A list of key-expressions for which all included publishers will be aggregated into.
  //      publishers: [
  //        // key_expression
  //      ],
  //  },

 /// Configure internal transport parameters
  transport: {
    unicast: {
      /// Timeout in milliseconds when opening a link
      accept_timeout: 10000,
      /// Maximum number of zenoh session in pending state while accepting
      accept_pending: 100,
      /// Maximum number of sessions that can be simultaneously alive
      max_sessions: 1000,
      /// Maximum number of incoming links that are admitted per session
      max_links: 1,
      /// Enables the LowLatency transport
      /// This option does not make LowLatency transport mandatory, the actual implementation of transport
      /// used will depend on Establish procedure and other party's settings
      ///
      /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization.
      /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to
      ///       enable 'lowlatency' you need to explicitly disable 'qos'.
      lowlatency: false,
      /// Enables QoS on unicast communications.
      qos: {
        enabled: true,
      },
      /// Enables compression on unicast communications.
      /// Compression capabilities are negotiated during session establishment.
      /// If both Zenoh nodes support compression, then compression is activated.
      compression: {
        enabled: false,
      },
    },
    link: {
      /// An optional whitelist of protocols to be used for accepting and opening sessions.
      /// If not configured, all the supported protocols are automatically whitelisted.
      /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream"]
      /// For example, to only enable "tls" and "quic":
      //   protocols: ["tls", "quic"],
      /// Configure the zenoh TX parameters of a link
      tx: {
        /// The resolution in bits to be used for the message sequence numbers.
        /// When establishing a session with another Zenoh instance, the lowest value of the two instances will be used.
        /// Accepted values: 8bit, 16bit, 32bit, 64bit.
        sequence_number_resolution: "32bit",
        /// Link lease duration in milliseconds to announce to other zenoh nodes
        lease: 10000,
        /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive
        /// messages will be sent at the configured time interval.
        /// NOTE: In order to consider eventual packet loss and transmission latency and jitter,
        ///       set the actual keep_alive timeout to one fourth of the lease time.
        ///       This is in-line with the ITU-T G.8013/Y.1731 specification on continous connectivity
        ///       check which considers a link as failed when no messages are received in 3.5 times the
        ///       target interval.
        keep_alive: 4,
        /// Batch size in bytes is expressed as a 16bit unsigned integer.
        /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535).
        /// The default batch size value is the maximum batch size: 65535.
        batch_size: 65535,
        /// Each zenoh link has a transmission queue that can be configured
        queue: {
          /// The size of each priority queue indicates the number of batches a given queue can contain.
          /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE.
          /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE,
          /// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU.
          /// If qos is false, then only the DATA priority will be allocated.
          size: {
            control: 1,
            real_time: 1,
            interactive_high: 1,
            interactive_low: 1,
            data_high: 2,
            data: 4,
            data_low: 4,
            background: 4,
          },
          /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
          /// Higher values lead to a more aggressive batching but it will introduce additional latency.
          backoff: 100,
        },
        // Number of threads dedicated to transmission
        // By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4)
        // threads: 1,
      },
      /// Configure the zenoh RX parameters of a link
      rx: {
        /// Receiving buffer size in bytes for each link
        /// The default the rx_buffer_size value is the same as the default batch size: 65335.
        /// For very high throughput scenarios, the rx_buffer_size can be increased to accomodate
        /// more in-flight data. This is particularly relevant when dealing with large messages.
        /// E.g. for 16MiB rx_buffer_size set the value to: 16777216.
        buffer_size: 65535,
        /// Maximum size of the defragmentation buffer at receiver end.
        /// Fragmented messages that are larger than the configured size will be dropped.
        /// The default value is 1GiB. This would work in most scenarios.
        /// NOTE: reduce the value if you are operating on a memory constrained device.
        max_message_size: 1073741824,
      },
      /// Configure TLS specific parameters
      tls: {
        /// Path to the certificate of the certificate authority used to validate either the server
        /// or the client's keys and certificates, depending on the node's mode. If not specified
        /// on router mode then the default WebPKI certificates are used instead.
        root_ca_certificate: null,
        /// Path to the TLS server private key
        server_private_key: null,
        /// Path to the TLS server public certificate
        server_certificate: null,
        /// Client authentication, if true enables mTLS (mutual authentication)
        client_auth: false,
        /// Path to the TLS client private key
        client_private_key: null,
        /// Path to the TLS client public certificate
        client_certificate: null,
        // Whether or not to use server name verification, if set to false zenoh will disregard the common names of the certificates when verifying servers.
        // This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com. If you wan't your
        // ca to verify that the server at baz.com is actually baz.com, let this be true (default).
        server_name_verification: null,
      },
    },
    /// Shared memory configuration
    shared_memory: {
      enabled: true,
    },
    /// Access control configuration
    auth: {
      /// The configuration of authentification.
      /// A password implies a username is required.
      usrpwd: {
        user: null,
        password: null,
        /// The path to a file containing the user password dictionary
        dictionary_file: null,
      },
      pubkey: {
        public_key_pem: null,
        private_key_pem: null,
        public_key_file: null,
        private_key_file: null,
        key_size: null,
        known_keys_file: null,
      },
    },
  },

  /// Configure the Admin Space
  /// Unstable: this configuration part works as advertised, but may change in a future release
  adminspace: {
    // read and/or write permissions on the admin space
    permissions: {
      read: true,
      write: false,
    },
  },
}

export RMW_IMPLEMENTATION=rmw_zenoh_cpp
export ZENOH_ROUTER_CONFIG_URI=DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
export ZENOH_SESSION_CONFIG_URI=DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5

@ciandonovan
Copy link
Author

I'm working on a reproducible container for this issue, but unfortunately it doesn't seem to be an issue with the simple pub/sub demo from the docs.

The issue occurred when streaming images so much larger bandwidth, working on a reproducible example that does something similar

@Yadunund
Copy link
Member

I'm working on a reproducible container for this issue, but unfortunately it doesn't seem to be an issue with the simple pub/sub demo from the docs.

The issue occurred when streaming images so much larger bandwidth, working on a reproducible example that does something similar

Ah my hunch is that your payload exceeds the 10MB we allocate to the SHM buffer.
Can you retry after manually bumping this value to a larger size + re-compiling?

// Megabytes of SHM to reserve.
// TODO(clalancette): Make this configurable, or get it from the configuration
#define SHM_BUFFER_SIZE_MB 10

We tried making this configurable in the zenoh configs but at the time custom key-value pairs were not supported.

@ciandonovan
Copy link
Author

@Yadunund yes I can confirm bumping the SHM_BUFFER_SIZE_MB resolves the issue

@ciandonovan
Copy link
Author

However, it still occurs when a larger number of connections are made/unmade in rapid succession.

zenoh-2024-06-30_01.20.52.mp4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants