From 5f03dcc56cd77e6474aab10663860d34c2a8e462 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 18 Jul 2017 17:21:00 +0200 Subject: [PATCH 1/2] Remove hibernate from GM We don't want to use the backoff/hibernate feature because we have observed that the GM process is suspended half of the time. We really wanted to replace gen_server2 with gen_server, but it was more important to keep changes in 3.6 to a minimum. GM will eventually be replaced, so switching it from gen_server2 to gen_server will be soon redundant. We simply do not understand some of the gen_server2 trade-offs well enough to feel strongly about this change. [#148892851] Signed-off-by: Gerhard Lazu --- src/gm.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/gm.erl b/src/gm.erl index 7604b9e197b6..5679fc30e5aa 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -552,8 +552,7 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer_sz = 0, broadcast_timer = undefined, txn_executor = TxnFun, - shutting_down = false }, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + shutting_down = false }}. handle_call({confirmed_broadcast, _Msg}, _From, @@ -888,7 +887,7 @@ noreply(State) -> reply(Reply, State) -> {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. -flush_timeout(#state{broadcast_buffer = []}) -> hibernate; +flush_timeout(#state{broadcast_buffer = []}) -> infinity; flush_timeout(_) -> 0. ensure_broadcast_timer(State = #state { broadcast_buffer = [], From 7d0e49c6391fce19d651ad4658923d4b3d415aa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20Pedron?= Date: Thu, 20 Jul 2017 18:37:19 +0100 Subject: [PATCH 2/2] Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu --- src/gm.erl | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/gm.erl b/src/gm.erl index 5679fc30e5aa..cf3e2170101f 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -395,9 +395,8 @@ -define(GROUP_TABLE, gm_group). -define(MAX_BUFFER_SIZE, 100000000). %% 100MB --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(FORCE_GC_TIMER, 250). -define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -416,6 +415,7 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, + force_gc_timer, txn_executor, shutting_down }). @@ -508,7 +508,8 @@ table_definitions() -> [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). + gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], + [{spawn_opt, [{fullsweep_after, 0}]}]). leave(Server) -> gen_server2:cast(Server, leave). @@ -551,6 +552,7 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, + force_gc_timer = undefined, txn_executor = TxnFun, shutting_down = false }}. @@ -707,6 +709,10 @@ handle_cast(leave, State) -> {stop, normal, State}. +handle_info(force_gc, State) -> + garbage_collect(), + noreply(State #state { force_gc_timer = undefined }); + handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); @@ -882,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. + {noreply, ensure_timers(State), flush_timeout(State)}. reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. + {reply, Reply, ensure_timers(State), flush_timeout(State)}. + +ensure_timers(State) -> + ensure_force_gc_timer(ensure_broadcast_timer(State)). flush_timeout(#state{broadcast_buffer = []}) -> infinity; flush_timeout(_) -> 0. +ensure_force_gc_timer(State = #state { force_gc_timer = TRef }) + when is_reference(TRef) -> + State; +ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) -> + TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc), + State #state { force_gc_timer = TRef }. + ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> State; @@ -957,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self, end, Self, MembersState), State #state { members_state = MembersState1, broadcast_buffer = [], - broadcast_buffer_sz = 0}. - + broadcast_buffer_sz = 0 }. %% --------------------------------------------------------------------------- %% View construction and inspection