Skip to content

RFC: clustering SPI revisited

Thomas Segismont edited this page Oct 8, 2019 · 3 revisions

Clustering SPI revisited

As of v3, the clustering SPI provides 3 different features:

  • cluster-wide shared data (maps, counters, locks)
  • storing clustered event bus data (multimap) and round-robin selection for sending (ChoosableIterable)
  • storing HA data (sync map)

Limitations

In v4, we would like to revisit clustering to address some limitations.

Delivery strategy

The multimap storage and ChoosableIterable retrieval does not allow to pick a recipient in a smart fashion.

For example, it cannot take into account:

  • the number of consumers registered on a node or different node capacities
  • the state of other nodes (health checks)
  • the deployment site (rack affinity)

Also, it uses only the event bus address to select a recipient, thus making partitioning inconvenient (many addresses registered).

Besides, having ChoosableIterable implemented in the cluster manager leads to:

  • duplication of ChoosableSet implementation or
  • an extra effort for the SPI implementor

Configuration

The clustering SPI does not define anything related to configuration. The EventBusOptions let you set host/port and security for message transport, but cluster manager configuration is either loaded from the classpath or specified via system properties.

Also, many users have complained about address/interface selection. Vert.x and the cluster manager operate independently in this area, and very often choose different addresses by default.

Goals

The goals for this effort are as follows:

  • enable smart routing of messages letting users provide a custom DeliveryStrategy
  • clearly separate concerns (delivery, routing, storage) to get a moderate level of complexity
  • avoid code duplication
  • lower the bar for clustering SPI implementation
  • cohesive configuration

Non-goals

  • Provide a DeliveryStrategy with a myriad of configuration options that suits every need but is hard to maintain or to reason about

Proposal

POC: https://github.com/eclipse-vertx/vert.x/pull/3132

Node identification

Nodes need a UUID to identify themselves. An ID comprised of host and port is not enough otherwise we can't make a difference between a node that just crashed and, for example, a new one restarted immediately on the same machine.

Users should also be able to provide node metadata (eg rack or datacenter info). While the default DeliveryStrategy will ignore it, custom strategies could use this information to implement partitioning.

public final class NodeInfo {

  private final String nodeId; // UUID
  private final String host; // TCP server host
  private final int port; // TCP server port
  private final JsonObject metadata; // user-provided info (rack, dc, ...etc)

  // ...

}

Registration management

The ClusteredEventBus signals the cluster manager every registration or unregistration, including local ones. Not just the first registration and last unregistration.

public final class RegistrationInfo {

  private final NodeInfo nodeInfo;
  private final String address;
  private final long seq;
  private final boolean localOnly;

  // ...
}

// In cluster manager

  /**
   * Share a new messaging handler registration with other nodes in the cluster.
   */
  void register(RegistrationInfo registrationInfo, Handler<AsyncResult<Void>> completionHandler);

  /**
   * Signal removal of a messaging handler registration to other nodes in the cluster.
   */
  void unregister(RegistrationInfo registrationInfo, Handler<AsyncResult<Void>> completionHandler);

At the expense of a bigger memory footprint, Vert.x nodes will get the opportunity to:

  • dermine how many consumers are registered on each node; this allows to send more messages to bigger nodes (e.g. where a verticle has more instances deployed)
  • send messages to local consumers even when other nodes have regular consumers on the same address
  • determine quickly if all consumers for an address are local, instead of querying the storage layer anytime a message is sent

In practice, most Vert.x deployments are of moderate size, with a relatively small number of addresses in use. So the memory impact should be limited.

In order to distinguish different registrations for a same address on a node, HandlerHolder gets a sequence number. This sequence number is recorded in RegistrationInfo.

Data integrity

ClusteredEventBus will no longer:

  • republish registrations after a node leaves or joins
  • remove subscriptions of crashed nodes

In general, logic related to cluster maintenance should no longer be incorporated in the ClusteredEventBus and DeliveryStrategy implementations.

Delivery strategy

The ClusteredEventBus relies on a DeliveryStrategy to select a node. It uses it regardless of the messaging paradigm: send, publish or request/reply.

When a node is chosen, implementations should return a list holding:

  • at most one value when Message#isSend() returns true
  • zero, one or more values when Message#isSend() returns false

The ClusteredEventBus skips the strategy call only when the user raises the DeliveryOptions#setLocalOnly(boolean) flag.

public interface DeliveryStrategy {

  /**
   * Invoked after the cluster manager has started.
   */
  void setVertx(VertxInternal vertx);

  /**
   * Invoked after the {@link io.vertx.core.eventbus.EventBus} has started
   */
  void setNodeInfo(NodeInfo nodeInfo);

  /**
   * Choose nodes the given {@code message} should be delivered to.
   * <p>
   * The result must not be null and must hold distinct values.
   */
  void chooseNodes(Message<?> message, Handler<AsyncResult<List<NodeInfo>>> handler);

}

In order to provide the service, the delivery strategy must be aware of registration updates. So the cluster manager provides a streams of updates:

void registrationListener(String address, Handler<AsyncResult<RegistrationStream>> resultHandler);

RegistrationStream is a ReadStream that emits a List<RegistrationInfo> when a node in the cluster registers or unregisters a consumer.

Clone this wiki locally