Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ra performance in large data transfer scenarios #135

Closed
jinshuaiyu2 opened this issue Oct 24, 2019 · 8 comments
Closed

ra performance in large data transfer scenarios #135

jinshuaiyu2 opened this issue Oct 24, 2019 · 8 comments

Comments

@jinshuaiyu2
Copy link

Our system uses the ranch service and ra. The ranch is responsible for receiving data from the client and handing it over to the ra cluster for storage, but we found that the performance is not good. And we also found that the ra members lost heartbeat in large data transfer scenarios even the ra cluster did not do anything. How can we optimize it?

@michaelklishin
Copy link
Member

michaelklishin commented Oct 24, 2019

We cannot suggest anything with the amount of information provided. How large is "large"? Over what kind of link? What heartbeat was used? What did various metrics, both Ra WAL log and infrastructure tell? This is [RabbitMQ] mailing list material.

@michaelklishin
Copy link
Member

michaelklishin commented Oct 24, 2019

We also have no information about how you use Ra (relevant parts of your code) or Erlang version information.

Generally any Raft implementation will be highly sensitive to log entry size, disk and network I/O throughput. Log entry payload compression can be a decent answer to all of those.

We have an Erlang distribution implementation that can use LZ4 or ZStandard compression (note: the repository is no longer public, we don't know if it would be). That can help significantly reduce the amount of information transferred between nodes at the cost of 5-30% of additional CPU load.

Erlang 22 uses fragmented inter-node message transfers which help avoid the widely known phenomenon when transferring one large (say, a gigabyte in size) message could cause all other inter-node communication to be blocked and all processes that try to send messages to processes on other nodes to be suspended. That naturally manifests itself as a missed net tick/heartbeat. Use prometheus.erl and our Erlang distribution Grafana dashboard to see if there's any evidence of "head of the distribution transfer buffer" blocking.

Don't guess, collect and use metrics instead.

@michaelklishin michaelklishin changed the title ra performance defects in large data transfer scenarios ra performance in large data transfer scenarios Oct 24, 2019
@michaelklishin
Copy link
Member

michaelklishin commented Oct 24, 2019

rabbitmq/inet_tcp_compress_dist is now open source under the same [double-]license as Ra. It's a very immature and highly experimental project but it sufficiently complete to run a real world RabbitMQ Quorum Queue workload so that our engineers can compare its CPU and I/O effects with the built-in (uncompressed) distribution carrier.

@jinshuaiyu2
Copy link
Author

I am very sorry for my statement. The following is our experimental environment:

  • There are 4 servers: 1 client server and 3 service servers for ra
    • CPU: 24 Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.40GHz;
    • Memory RAM: 64G;
    • disk storage: 10 hard disks(4T) for raid50;
    • network bandwidth of each server: 10000Gb/s.
  • Erlang version is Erlang/OTP 21, ranch version is 1.5.0

We start one ra cluster with ra:start_cluster(ClusterName, Machine, Members) in each service server, so there are 3 ra cluster.

Client server send {FilePath, Offset, BinaryData} (BinaryData size is 64KB, use gen_tcp moudle)to 3 ranch service servers, then service servers excute ra:process_command(Server, {write, FilePath, Offset, BinaryData}, 60000) .ra_machine behaviour is:

apply(#{index := Idx}, {write, Path, Offset, Data}, State) ->
    case ets:lookup(?FDTAB, Path) of
        [] ->
            {State, {error, ebadf}, []};
        [{Path, Fd}] ->
            ok = file:pwrite(Fd, Offset, Data),  %%write file data to storage
            Effect = case Idx rem (32 * 1024) of
                          0 ->
                              lager:info("return effect release_cursor Idx: ~p", [Idx]),
                              [{release_cursor, Idx, State}];
                          _ ->
                            []
                      end,
            {State, ok, Effect}
    end;

The client sends an unlimited number of messages to the servers. The measured upload rate is about 200MB/S when write file to disk storage in apply. Then we write file to RAM or no write in apply, The measured upload rate is about 350MB/S. When we only use ranch to receive data without ra, upload rate is about 1000MB/S(network bandwidth), but we find leader transfer in ra cluster like {ra_server1_71,ra_node@host72}: candidate -> leader in term: 2 even the ra cluster did not do anything.

I am very sorry for my English statement again :).

@michaelklishin
Copy link
Member

michaelklishin commented Oct 24, 2019

Well, not writing data to the log (disk) obviously would always be significantly more efficient than doing so.

I suggest that you set up a small example app that

  • Uses Erlang 22 as you are very likely to run into the two decade old "head of the distribution buffer/queue blocking" problem covered in Fragmented distribution messages
  • Does not even use Ranch, simply generates pseudo-random inputs
  • Simulates what your "Ranch server" would do with the data
  • Exposes metrics to Prometheus using prometheus.erl and the relevant Grafana dashboards from rabbitmq/rabbitmq-prometheus
  • Allows you to observe disk, network bandwidth, distribution buffers, Raft WAL metrics, and so on

Once you have some initial data,

  • Try with thezstd compression carrier I've just open sourced (see above) to compare
  • Re-run the test with OTP 21 and compare the data. My guess is that even with compression enabled, you'd see a significant difference in behavior due to the "head of line blocking" problem when data is replicated between nodes

My guess is that you overload the distribution in OTP 21, which suspends Ra processes, which in turn leads their peers to detect heartbeat timeouts and trigger a new election.

We are aware of one workload where the WAL operations do not keep up with the Ra client. However, this doesn't seem to be the case here.

@michaelklishin
Copy link
Member

It's important to mention that fragmented distribution only kicks in when both peers run OTP 22. So running only some nodes on OTP 22 will not make any difference.

@michaelklishin
Copy link
Member

Our team had to make the repo private again as we don't know if it will be open sourced yet. So instead of using zstd compression for distribution you have to

  • Switch to Erlang 22
  • Set up monitoring, including for Erlang distribution
  • Compress payloads using zstd (it's a remarkably efficient algorithm in our experience) or lz4 (also quite good) in your own code. This would help with local disk I/O anyway.

@kjnilsson
Copy link
Contributor

Ok so your performance issue most likely comes from using file:pwrite/1 inside the apply/3 function. Ra is designed to primarily support pure state machine, i.e. the code in apply/3 should not perform any side effects. You may want to watch my talk on Ra state machines from CodeBEAM STO last year where I think I cover some of the state machine best practises

https://www.youtube.com/watch?v=wHpNfCeX_Vk

Do you need to write the data again? It is already on disk in the Ra log. Could the state machine just keep a map of the blobs and the references? I'm not sure what you are trying to do in this case so more info would be needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants