Skip to content

Transition Plan

Mark Papadakis edited this page Oct 14, 2016 · 14 revisions

Please read about DHTs first.

It’s useful to think of ring topology updates as transition plans. A plan is formulated that describes what needs to be done in order to transition from the original ring state(topology) to the final ring state.

This can involve 1 or many different notes. For each of those nodes, the list of tokens they will own in the final ring is specified. This can be an empty list of tokens if you wish to remove that node from the final ring. It can also be a list of new tokens, if a node is joining the ring or is perhaps upgraded and can take more work so it can serve more ring segments, it can be a list that includes all current tokens except a few (because the node has been downgraded). You get the idea.

For example, suppose you want to transition from the current ring, and the transition involves 4 of nodes, A, B, C, and D:

A: from [10, 20, 50] to []
B: from [] to [120, 200]
C: from [55, 80] to [80, 170, 250]
D: from [135, 155] to [135]
  • Node A is going to leave the ring (it will have no tokens after the transition is completed).
  • Conversely, node B (which wasn't participating in the cluster) is joining the ring — after the transition it will have two tokens in the ring.
  • Node C will release token 55 and will acquire two new tokens, 170 and 250.
  • Node D will release one token, 155 and will have one token in the final ring.

Instead of thinking of those as individual, distinct operations, and processing them as such, it is optimal and preferable to think of them as one, so that the plan formulation will take into account the current ring segments ownership, the final ring ownership, and ring segments overlap, in order to compute the least possible data transfers required to accomplish the transition.

Please note that, depending on the replication strategy and factor, potentially many other nodes (excluding those that you wish to update their ring tokens) will be affected. For example, if your replication factor is 3, and just one node leaves the ring, it means that the replication factor for the ring segments it was replicating was decreased by one, so two more nodes need to receive copies of all those ring segments. This gets exponentially more complex when you consider ring segment overlaps and high replication factors.

Imagine if you had to execute those operations in sequence. You would need one to wait for one operation before you can execute the next, and you would also need to potentially transfer a lot more data than needed.
For example, if, based on the above example, B would become responsible for ring segment (150, 200], you would transfer that data to it, and then when node C would acquire token 170, node B would likely instead become responsible for (170, 200] - so you would have transferred content of (150, 170] for nothing. This is a simple illustrative example, whereas in practice executing those steps in sequence would mean orders of magnitude more time, resources required, and unnecessary data transfers.
In addition to that, having such a plan allows for other optimisations, like the one described below.

Suppose you are building a KV datastore, and you have a transition plan. The plan is effectively a list of: ring segment, destination, list of replicas, whereas “ring segment” is the ring segment that describes the data to transfer(e.g all rows where the key token is within that segment), destination the node to receive that content, and list of replicas is 1 or more nodes that can provide that data to the destination.

To do this, in this hypothetical data store implementation, you would for example build a file on one of the source replicas, that contains all rows where their key is within the ring segment, and then stream that to the target. That would work fine. However, what if there are many different ring segments that one node would be able to serve to one or more destinations?

Then, instead of generating one file for each of this segments on the replica by scanning all rows for each of those files, you could instead scan all replica's just rows once, and for each row, check if it matches a ring segment for one of the destinations, and store the row in the file for that destination(this is usually accomplished with explicitly data compactions). So you would essentially build multiple output files by going through the rows once, as opposed to building them by going through the rows however many ring segments the replica was asked to stream to a destination.

This is possible with the plan because, unlike with sequential execution, you have the complete list of all transfers required to accomplish the transition.

Strategies for plan application

When you have a transition plan, you want to execute it(commit). Here is one way to do it, which matches-roughly- the strategy employed by Riak, Cassandra, and CloudDS, among other datastores.

Before you initiate the streaming operations, all nodes in the cluster should know about the plan. If you use a gossip protocol, you need to disseminate that plan to all of them and wait until you have concluded that they all have it.

You need to need to consider an optimal, fair allocation plan for the transfers. Each transfer can involve 1 or more source replicas, and each of those source replicas may also be source replicas in other transfers in the transition plan. So you can should take into account:

  1. opportunities for generating the data to stream for multiple transfers from the same replica (see earlier). If segments (10, 20] and (80, 85] are to be transferred and, both can be provided by the same replica, maybe you should ask that one replica to handle both so that it will scan its rows just once and generate those two files in a single pass.
  2. You should consider physical distance of source replicas from destinations. If a segment (10, 20] can be provided by both node A and node B, but node B is on the same data centre or rack as the source, then node B should provide the content, not node A.
  3. You should consider the load of the replicas and fairly distribute transfers among them. If a source node is too busy, maybe another node can serve the content instead.
  4. You may want to place a configurable maximum on total amount being transferred for this purpose at any given moment both per-node, and overall, in order to not overload the cluster administratively. (via @justinsheehy)

When a node becomes aware of the active transition plan, if it’s going to coordinate a WRITE operation, it should check if there is an active plan. If there is one, it should check if the token of the row matches any of the plan operations, and if so, it should forward the write to the destination (as well). This is so that when the streaming operation is complete, the nodes will already be up to date(this is a trivial for idempotent operations, e.g LLW semantics, but may require more effort for different designs).

When the streaming operation is complete, you need to actually transition to the new ring and disseminate that information to the cluster.
You then need to ask the nodes in the cluster to forget the current plan — there is no need to forward data anymore.

You are done.

Considerations

  1. You should only allow for a single active plan at a time. You may want to queue plans to be executed after the current plan is complete. This is consistent with Riak’s semantics
  2. Instead of disseminating the project plan from the node that coordinates the transition, you can instead rely on cluster nodes to also compute the plan themselves (which is what Cassandra does), as a reaction to gossip messages. This is not optimal though.
  3. It is also worth it to have mechanisms you can invoke when you can't reach all nodes but still need to take action (Riak calls these "force-replace" and "force-remove"). via @justinsheehy

Some implementations specifics

RIAK

Riak supports on-demand resizing by arbitrary factors based transition plans. Transition plans can include different type of operations. Multiple resize operations can be issued and are queued and executed one after the other, and you can cancel any pending transition plans.
It will do a best-effort about minimising movement. Like Cassandra and CloudDS, it will keep track go all "pending segments" and the coordinator will forward writes to pending segments destinations if needed. See also https://billo.gitbooks.io/lfe-little-riak-book/content/ch4/5.html
Thanks to @justinsheehy and @tsantero for the insight.

Cassandra

Cassandra does not have a notion of a transition plan, but it does keep track of pending segments, which are computed on every cluster node, and coordinators forwards write to them if needed. Because it doesn't support transition plans, you potentially lose a lot of benefits and the time to transition and resources required can be too great. It also relies to gossip dissemination time - but there doesn't appear to be any system in place that can verify dissemination success so arbitrary wait time is used instead.
Thanks to @rustyrazorblade for the insight.

Ideas and strategies

This Cassandra JIRA by @rustyrazorblade describes a potential problem with large nodes, and proposes a clever and pragmatic solution to it.

--

Building a transition plan with this library is trivial. You only need to use the Ring::transition() method.

Links

Clone this wiki locally