Join GitHub today
GitHub is home to over 28 million developers working together to host and review code, manage projects, and build software together.Sign up
[WIP] Multiraft #859
This pull request adds support for tiered Raft (aka Multiraft). The idea is that multiple mini Raft clusters can be created to replicate sharded data while a InfluxDB cluster-level Raft group would manage cluster configuration and schema changes.
This is a work-in-progress and is currently in the early planning stages.
Looks like an awesome start. I have some questions/comments.
Should we use an interface instead of the raftmdb? We'll want to have the InfluxShardDatastore be the actual log store. Or maybe that's the SnapshotStore?
Maybe the group log path should have the id in it? I can imagine a scenario where a server could potentially be a member of multiple groups. For instance, say we have a cluster of 5 machines, but a replication factor of 3. Each group would have to have 3 servers, so you'd need potentially 3 groups per server (or 15 total) to evenly spread things around.
I really like the idea of having the top level cluster replicate its log down to the group leaders. Nice and clean and solves our problem of replicating the cluster wide data to all servers without having a massive single consensus group.
I started doing interfaces at first but it was getting confusing this early on. I'll revert back to interfaces now that I'm getting a better sense of the whole structure. We'll probably want to optionally have the log entry storage in-memory since we're immediately persisting the state into the Influx backing store.
I think it'll start getting really confusing once you have overlapping groups. A group will automatically have a built-in replication factor depending on how many nodes are in it. Do users typically use a non-default replication factor? Do they use different replication factors for different shards? Personally, I would say that we deprecate replication factor since it's essentially going to be forced at the group level.
Yeah, it definitely makes reasoning about everything much easier if groups have a linear history. Also, since there's not much data going through the global Raft log, it shouldn't be a problem to stream its committed log out to 10s or 100s of group leaders.
Currently they can have different replication factors for different shard spaces. However, let's assume that the replication factor is fixed cluster wide. Even then you'd need to have multiple groups on a server to get data evenly distributed across the cluster.
Say you have 5 servers and RF is 3. If you have 1 group then you're only using three of the servers. If you have two groups then you have 1 server with two groups and the others with only 1. To ensure that you have data evenly distributed, you'd need to have the same number of groups on all servers.
In cases where the size of the cluster is evenly divisible by the RF this is easy. But as in the case of 5 servers and RF 3, the LCD is 15. so you'd need 15/3 = 5 groups to evenly distribute the data.
I guess if they have multiple replication factors defined, like say 3 and 5, then you'd need to have groups across the cluster for each RF. So in the example of a 5 server cluster you'd have 5 RF 3 groups spread evenly across the machines and 1 RF 5 group.
Maybe this is what Cassandra does under the hood with multi-paxos since they let you define RF on the column family level? I think it's useful to be able to have different RFs in a cluster. It's not just for HA, but also for read scalability. If there's one shard space that is super hot for reads then they can up the RF to get more servers that can answer any given query.
Although I assume most people running a cluster would be using RF of 3. And we can always revisit this later and just make RF a cluster-wide thing for now since it would certainly make things easier.
Another example of why different replication factors would be nice: if replication slows down data ingest (which I assume it would) it would also be useful to allow replication factor to be zero for a shard space that is for once-off import of a large data-set (so the import could be as fast as possible). Then, once the data-set has been imported, increase the replication factor to 2 or more to increase its redundancy. (However, I'm not sure if InfluxDB supports this dynamic changing of RF, replicating the data in the process, so the question may be moot).
@otoolep dynamically changing the replication factor isn't something we'll support (at least not for a while). For the case of bulk loading data, we'll be supporting things like batching writes together to make it more efficient.
Also, RF of 2 won't really be worthwhile since it won't give you high availability. If one node goes down you won't be able to achieve majority on the replication group.
@pauldix I was trying to work out over the weekend how to make the replication factors work with multiraft but I'm not able to get it to fit. Raft clusters have a large communication overhead per cluster so trying to lump several clusters onto each box is going to be painful. I tried to come up with a logical sharding schema to layer over top-level Raft clusters but there's no way to adjust the replication factor at that point.
I did some more research into Kafka. There's Kyle's post and then Jay's rebuttal:
Here's the tl;dr: Kafka maintains two quorums: one for metadata stored in Zookeeper and one in Kafka for the actual log data. Zookeeper requires strong consistency with 2/N-1 failures allowed, however, Kafka's quorum has a set of "active" replicas (called "ISR" - In-Sync Replicas) that are streamed to. If a replica is not heard from then it's dropped from the ISR list. You can get in a situation where all your ISRs are dropped and the leader is only writing (and ACKing) to itself. If the single leader is partitioned from Zookeeper then a new leader will be elected and 100% of the data written to the old leader will be lost on reconnect.
Kyle did have some suggestions for improving that. Basically, don't let your ISR get down too small.
Now that I know more about the InfluxDB sharding setup, I'm starting to lean toward a Kafkaesque approach
Kafka shows 2M writes per second on a 3-node cluster in this benchmark:
Even if we're half that speed then we're still doing pretty well. Going with that approach would give us the throughput we're looking for and it would let us adjust RF on the fly.
I know you're AFK so no rush getting back to me. Hit me up when you get a chance.
FWIW, I've built a couple of big pipelines with Kafka, and found it to be a great system. Its performance really is superb. But one of the reasons it's so fast is because it doesn't actually do much with the data -- it just streams it to disk, is optimized for spinning disks, and relies on the Linux buffer cache to give it fast access to the disk. However, messages can be lost in certain circumstances, if the machines go down, due to its flushing nature.
By comparison, InfluxDB (presumably) does a bit more with every bit of data it receives (writing the time series data to database, for example), I'll be quite impressed if you can get numbers like those of Kafka. :-)
@otoolep I've built a couple systems on Kafka as well and it really is a great tool. Having a distributed WAL is a pretty awesome way to design a lot of types of systems.
The idea with using a Kafka style log instead of "multiraft" is that we write data points into this log inside InfluxDB and then individual machines within the cluster will stream some subset of that data. For example, say you have a 10 node cluster. 3 of those can be responsible for receiving events and writing to a log. These essentially function as a Kafka-like log. The remaining 7 nodes can consume from those 3 log nodes. InfluxDB shards would be analogous to Kafka topics.
The consumer nodes will do the real work of writing the series data to LevelDB/RocksDB/etc but we'll be able to scale those consumer nodes independently of the log nodes.
Does that make sense? I just want to make sure I'm not missing something.
@benbjohnson -- that is an interesting idea, and does make sense. In some ways the "3" machines (let's call them data-less nodes) you describe become simpler reception points, streaming the data to disk. They are almost like special nodes whose only job is to receive the data, and ensure it has been written safely to disk.
Then downstream you attach as many consumers as needed, to get the series-processing throughput you need (would this be pull or push?). I am interested if you have anything in mind like Kafka's partitions though, since the number of partitions for a given topic sets the parallelism.
Unless you're suggesting that there would be a "topic" on each of the data-less nodes for each downstream shard? I could see how that would work, but wouldn't clients then need to know which data-less node requests should be sent to, for a given shard? Or would every node in the cluster know about every other shard? Would the client's operation return immediately after being written to the log on the dataless node, or would it wait until the data has actually been inserted in the time-series? How does replication play into this?
Obviously there is a lot to discuss here, and I don't expect enormous detail on github. I may be missing part of your design. I'm happy to think about this more if it helps. I can't help comparing this design to elasticsearch. The comparison may be instructive (assuming you are not familiar with it). It uses its own custom cluster management system, and each node knows where all the other Lucene shards (indexes) are (across the multi-node cluster). This allows any node to service any index or search request, and if the node does not have the data locally, it communicates with the node that does, on behalf of the client. The client call blocks until the data has been indexed (if it's an index request) or the search results are returned. While all cluster management goes through the master, indexing and search requests do not. Of course, elasticsearch then has to deal with the problem of nodes getting out of sync with the master's state -- which has become infamous. But the comparison helps me think about your design.
For smaller clusters (e.g. 1 - 5 nodes), these data-less nodes would need to overlap with the "data" nodes. I don't have a specific plan for how or when that overlap should happen or when they should be split off. We'll probably just need to do some empirical testing.
I'm thinking it would be a pull model. Especially since we may want to have a node attach onto another data-node to daisy chain log replication. This would come in handy if we're talking about hundreds of nodes in a cluster.
I think we'll get our parallelism from having a "topic" per shard. I'm not sure that we'll need to further segment the data in a single shard into partitions. Having a single linear log for a shard would be really nice to have.
I'm thinking that there would be a "topic" per shard on the data-less nodes. There would also be a "main" topic that every node subscribes to that streams cluster configuration changes. That way all nodes would be able to know which node has which shard and can redirect requests appropriately.
The client would return after it's written to the data-less node. There would be a large communication overhead to ACKing back from all data nodes.
Incoming data would be replicated within the data-less nodes. From there it would be streamed down to different data nodes based on replication factor. The data nodes would each have their own LevelDB/RocksDB/etc database that they're inserting time series data into. If the replication factor for a shard is increased then the new data node will need to copy over the shard database from an existing data node and then connect to the data-less nodes to begin streaming from the offset that the shard database was copied from.
I've read ES's replication design a while back but I'll brush up on it again.
I'll have to read through and see how they get out of sync in ES. I can't think of how nodes would get out of sync with the data-less nodes in the design we're talking about. Everything should be linear and consistent per the log.
It sounds like you have a good understanding of the design. Let me know if you find of any holes. :)
To be clear @benbjohnson, would these "configuration changes" be distributed to the nodes via Raft? I just want to be sure I understand your proposal.
@otoolep I think we can do a streaming or pipelined Raft that would allow us to push all data in the "data-less" nodes through Raft -- configuration changes included -- and still get the performance we need. That'll need empirical testing though.
From there the configuration changes would be streamed out via a topic to all other nodes in the cluster.
As long as your design can deal with these other nodes falling behind in configuration, @benbjohnson . Since they are pulling, there is no guarantee they will be "caught up to live" with config changes. Do you think this could be issue?
@otoolep They're just dumb replicas so the only issue I could see is that a query could be redirected during a configuration change and hit the wrong node. We could either have that next node try to redirect to the correct node (and hoping it's up-to-date on configuration) or we can direct queries through the data-less nodes which can then redirect to the appropriate data nodes.
referenced this pull request
Aug 29, 2014
I read through the Kafka codebase to get a better understanding of what they're doing and how it can apply to what we need. It's 20,000 SLOC of Scala-y goodness so I'll give a summary. :)
For clarity, here's the glossary of top-level entities in Kafka:
Kafka is really overkill for what we need. It includes features like cleaning of old log segments (instead of dropping them), it splits topics into partitions for client-side parallelism, and it has a complex interaction with Zookeeper to maintain leadership and ISRs. The Zookeeper code alone is probably a third of the codebase.
My thinking at this point is that we replicate everything through a single sequential log in our "data-less" nodes (and those then fan out shards to the appropriate "data" nodes). InfluxDB generally has very small messages (e.g. <100 bytes) to replicate and SSD disk write speeds can be between 100-300MB/s. (I'm seeing 250MB/s on my MBP and 108MB/s on my DigitalOcean droplet). So if we can take advantage of the write speed then we can get +1M messages/sec for every 100MB/s of disk throughput.
Existing Raft Implementations
I did a bunch of benchmarking and tweaking on hashicorp's Raft impl and I'm able to get up to 80K msgs/sec when using small messages (32 bytes) and using the in-memory store. I didn't try the raft-mdb because it was going to require a lot more work but I assume that would be slower. Here's the benchmark results with this test code:
$ go test -v -run=XXX -bench=. -benchmem -benchtime=5s -cpu 8 1000000 12388 ns/op 8951 B/op 56 allocs/op
There seems to be a lot of allocations happening. That benchmarking code doesn't do error checks. If I add them back in it goes up to 84 allocs/op. The pipelining that hashicorp's impl uses is definitely faster than go-raft's but it's still not nearly as fast as we need it to be.
My proposal is that we implement streaming Raft. It's actually fairly similar (architecturally) to Kafka but is mostly based on the original Raft paper. In the normal Raft model, the
The idea is that the leader maintains its dominance over the followers by periodically heartbeating but those heartbeats contain no log entries. This allows the heartbeat to not be delayed by large message sizes which would cause a re-election. The follower responds to the heartbeats by telling the server what its highest written log index is. The leader uses that index to determine the commit index (once it has a quorum).
Separate from the heartbeat mechanism, the followers connect to the leader and stream a continuous stream of log entries and write them to disk. Because there's no synchronous acknowledgement, this stream can go as fast as your disk or network will let you. If new leadership is established then the follower simply disconnects it's stream from the old leader and goes through the normal Raft election process.
The benefits to streaming Raft are that we get significantly higher throughput and our leadership is more stable during high load because heartbeats don't rely on message sizes. The main downside to streaming Raft is that a non-fatal issue in the leader's log writing (e.g. hung disk write) won't trigger a re-election since the logging and heartbeating are separate. We can get around that issue by using heuristics. I feel like that's mostly an edge case.
I know this is a really long comment but I wanted to be thorough. Implementing streaming Raft would be significantly simpler than trying to do something like multi-raft and it gives us a pretty straightfoward, linearizable architecture.
That's my proposal. What do you guys think?
(I'll presume you want some feedback from the community like myself, as opposed to just the contributors!)
Thanks for all the extra details @benbjohnson. I do find what you're proposing quite different from other existing systems. You are proposing that the data-less nodes maintain a big buffer on disk, and then let the data nodes pull data from it. This is a very different design compared to, say, Cassandra or elasticsearch. In those systems, while they do have a log, those logs are local to the data node and are to provide throughput and reliability. Your design seems to propose pulling that log away from those nodes, placing it on the data-less node, and having the data node pull from the data-less node.
Assuming your network and disk are not suffering problems.
I ask this because one big question I have about your design that it is possible for the data nodes to "fall behind" the data-less node (if I understand it). The data nodes may have hot CPUs, their storage could run slow (perhaps it's attached over the network), they may be serving heavy queries, GCing -- in fact, all manner of reasons could cause them to run slow. Systems that pull from queues have really nice characteristics, but then they may fall behind. Your design is going to have to deal with data-nodes that fall behind, right? How will you detect it? How would Operations do so? How will the data-nodes keep track of where they are in processing the data if they reboot and come back up?
It seems to be an assumption in your design that the data nodes will always be able to keep up with data arriving at the data-less nodes. I'm not sure if that will be the case. If the data-less nodes are doing a lot less processing of the data -- basically writing it to disk -- it is almost certain that the data nodes may fall behind since they may be performing heavier processing (assuming all have the same CPU, RAM, and disk resources).
Definitely! Sorry, I didn't mean for the "cc" to come off as only directed at Paul and John. I just wanted to make sure they saw it. :)
Cassandra is a great tool but it's also quite complex. Designing and testing eventually consistent systems is notoriously hard and I was aiming for an approach that was much simpler. Cassandra's code base is ~110,000 SLOC of Java so it's pretty hefty. I think we could implement streaming Raft in ~1-2KLOC.
Elasticsearch has a different architecture but I've been hesitant to borrow from ES too much considering their data loss issues.
One of the main goals of multi-raft was to have better consistency while providing high throughput. It makes InfluxDB a CP system but I think that's reasonable considering the primary use cases and the fact that InfluxDB clients can hold metric data temporarily while a server is unavailable and submit everything as a batch when the server comes back online.
I meant to touch on this in my previous comment but forgot. Thanks for bringing it up. We'll need to apply back pressure to the data-less nodes if all the data nodes for a shard get too far out of sync. I'm ok with nodes getting behind for a bit but if they get too far behind then we'll need to start adding delays for new inserts into that shard until at least one of its replicas can catch up.
Since it's all going through a unified log, we'll know how far behind data nodes are based on the monotonically increasing log index. Each data node can persist where they are in the log by saving that index when they write data. If a node crashes then it can use that log index to restart their data stream.
Data nodes definitely won't be able to keep up with the log when there's a small number of nodes. We'll need to implement back pressure early on. The data-less nodes won't be the bottleneck until we reach a much larger cluster.
The biggest assumptions of this design (IMO) are:
Thanks again for all the feedback on this issue. I know it's a lot of nitty gritty stuff but it's good to hash it out early on. :)
OK, cool @benbjohnson -- you've obviously thought about this. I think you can get what you're proposing to work. Whether it'll be what you end up with is another question. :-) But you're probably at the stage you need to write more code to find out!
I generally follow your design. I'll look out for more proposed patches, and provide feedback as best I can.
I'm nowhere near an expert on this, and i didn't dive into all the details, but here's a few thoughts, for whatever they are worth.
The hard part is writing data to a distributed system in a consistent manner. Using an established protocol such as Raft lets us perform this safely without writing our own consensus protocol from scratch. If we only write to the nodes that need to store the data then we run into consistency issues and potential data loss in failure scenarios. How do we detect what was written and what was not and to which nodes? How to we rebalance data across nodes in a consistent manner? We can use tools like vector clocks to do eventual consistency but that makes everything significantly more complex.
The distributed WAL (aka "data-less" nodes) is much less resource intensive than writing and indexing the data to its final state (aka "data" nodes). I don't necessarily see this as a "double writing" setup when you look at in terms of work performed. I would bet that there's a 10-20% disk overhead incurred by the distributed WAL since it's incredibly efficient to write it. For people who don't want the disk overhead we could make the data-less nodes maintain the data in-memory and allow a small amount of data loss in the event of a full cluster crash.
My plan is to write streaming raft as a self-contained as a subpackage of influxdb/influxdb. We may do specific optimizations for influxdb but a generalized package could easily be extracted from there.
@benbjohnson -- just want to be clear on one point. You are proposing that every insertion of data (crudely speaking, every HTTP POST /db//series) goes through a Raft log on the data-less node? Is that correct? If so, what are the criteria for that operation to be acknowledged by the data-less node? Would a majority of data-less nodes (with one acting as leader) have to ack the operation? Something else?
Yes, an operation would be acknowledged once it's written by a majority of data-less nodes. That will ensure that it will be durable despite any number of node failures.
OK, now I see. Your design has echos of 0.8 Kafka (0.7 didn't have replication) -- I see where you're going now. Interesting!
I think this makes a lot of sense if we can get the streaming raft implementation to perform at the level we need and ~2m writes per second is a good target for this initial one. Here's a bunch of random thoughts. Please correct me where I'm wrong.
Writes go into the data-less nodes. These writes go through a streaming raft log and all shards go into a single big log.
Data nodes have the indexed copies of shards. They pull writes from the data-less nodes. Basically they just ask for writes from a given log index for a given shard. The data-less node will give all the writes minus the ones to shards that the requesting data-node doesn't care about.
I assume that the data-less nodes will have to keep track of which index each data-node has been updated to for each shard. That way it will be able to truncate the log. I further assume that the snapshots are essentially the indexes on the data-nodes so we need not keep snapshots on the data-less nodes.
For new servers joining the cluster, we can keep things simple and just have the new servers pick up shards when the next block of time gets created. For example if we have 4 data-nodes and an RF of 2. Then 2 shards get created for each time block. If we then add 2 new data-nodes, when the next block of time gets created we'd make 3 shards for that and those new servers would then start pulling data for those new shards.
For a server that replaces a downed data-node, we can just request the indexes from the data-nodes that have the shards it needs and then pull from the logs on the data-less nodes once the copies come over. It will have to tell the data-less nodes that it owns the given shards so that they don't truncate the logs.
Data-nodes pull from a master log for all the meta-data. What data-nodes exist, what data-less nodes exist, shards and where they are, databases, series, users, and continuous queries.
It seems that for writes this would be CP, while queries to the data nodes would be AP with consistency being eventual. Because writes are always streaming into the data-less nodes, there doesn't seem to be a way to give a guaranteed consistent read. Unless we implement something later that will do quorum reads on the data nodes that have a copy of each shard being queried.
Would it be possible to bypass the data-less portion for single server setups? Either that or maybe the in-memory log would be the thing to do. Basically, I want to avoid the write amplification and extra overhead for the case of single-server setups, which are quite common.
One concern I have is around failure scenarios. If there is a single log for all shards, and a few servers go down or get partitioned from the data-less nodes that own a single shard. Then log truncations would halt until that gets fixed. At 2m writes per second, we could fill up a disk pretty quickly. We saw this problem before with the WAL around version 0.5.
Is it possible to have a separate log per shard? Assume that while there may be thousands of shards across the cluster, only a handful are hot for writes at any given time. For example if we have 3 data-less nodes and 30 data-nodes. And the user has RF 3 on all their shards. Then we'd only have 10 shards hot for writes (30 / 3). Or if RF 2 then 15 hot shards.
How does timeout work for the data-nodes and pulling their logs? For example, if a data-node goes down for 2 days, at what point does the data-less cluster assume that the server is down for good and removes it from the group so that it can continue truncating the log?
Ok, there's probably more I can think of later, but let me know if I'm understanding your design correctly.
Yes, we'll need to have the log index tracked for each data node but it doesn't need to be immediately up-to-date. We can probe it every few seconds.
Yep. Snapshots are held on the data nodes so they can be copied to other data nodes as needed (and with their current log index).
That's probably the easiest way to start. We could also copy over shard indexes to the new nodes and have those new nodes start from the log index. Then we could change RF on the fly.
Yes. Or we can add+remove nodes instead of replacing a node.
Yes. Data-less nodes will also read the log to maintain the cluster configuration.
We can't have conflicting writes in this setup so I would still consider the data nodes to be CP.
We could send a query through the WAL and have the query executed by one of the data nodes at the specific log index. We'd have to maintain something like Kafka's ISR (In-Sync Replicas) so we'd know what replicas are up-to-date. We'd also have to hand off the client from the data-less node to the data node that's handling the query. It's more complicated to do consistent reads but definitely doable in this setup.
Also, since we have a linear global log we could probably implement a consistent cluster-wide backup.
Yes, the Raft log is overkill for a single node setup.
We can make the truncator into a cleaner so that it'll write out a new, smaller log of any shards that are not caught up.
If we separate into multiple log files like Kafka then we lose linearizability between configuration changes and shard data. That introduces race conditions. I don't think multiple logs is helpful either since multiple data nodes will be ingesting from multiple shards, it's easier to simply read from a single file descriptor and just filter out which messages go to which nodes. Otherwise we'll need to multiplex multiple file descriptors over multiple data nodes.
Right, but without having queries that do quorum on the data-nodes that have a copy of a shard, depending on which data-node you hit, you could get different results (if one of the nodes is a bit behind, for example). So that's what makes me think the reads are more AP.
Unless queries will have to run through the data-less nodes, which I don't like. That creates a bottleneck for query scalability.
Yes to this as long as it can be done without a major performance hit.
It would also be nice to be able to set a max-log size. So if any shards get partitioned off from the data-less nodes for a very long time, they don't end up bringing down the entire cluster.
Then we can pair this up with back-pressure. If a shard gets partitioned off for too long (or the data-nodes get too backed up) and the log gets too big, writes to that shard can just start failing, while writes to other shards can still be successful.
That way the data nodes don't even need to send back pressure info to the data-less nodes. The data-less nodes will know how far behind every data-node is by the requests coming in for the given log indexes.
Considering the use cases for InfluxDB, I don't think it makes sense to force consistent reads. The only time I could see doing consistent reads is where you need to read from more than one shard across two data nodes and you need the number exact. You'd have to go through the data-less node for that.
Most use cases can deal with stale reads -- especially where the staleness is less than a second. I'm assuming that's the target use case. We can do that by having queries run directly against the data nodes.
We'll get a performance hit if a significant percentage of the log is not caught up by data nodes. I think having a max size on the pruned log is a good idea. If it gets too large then segments start getting dropped off the end.
My thought for the streaming messages is that data nodes would simply connect once with a given log index and then keep a log running connection indefinitely. If anything happens to the data node or the data-less node it's connected to, it just reconnects using the last log index it saw and continues the stream from where it left off.
In this situation, we wouldn't have new requests from the data nodes so we won't know where they're at. We'll still need nodes to report their log index position periodically.