Skip to content

Commit

Permalink
Optimize signal sending to processes with message_queue_data=off_heap
Browse files Browse the repository at this point in the history
Erlang guarantees that signals (i.e., message signals and non-message
signals) sent from a single process to another process are ordered in
send order. However, there are no ordering guarantees for signals sent
from different processes to a particular process. Therefore, several
processes can send signals in parallel to a specific process without
synchronizing with each other. However, such signal sending was
previously always serialized as the senders had to acquire the lock
for the outer signal queue of the receiving process. This commit makes
it possible for several processes to send signals to a process with
the message_queue_data=off_heap setting* activated in parallel and
without interfering with each other. This parallel signal sending
optimization yields much better scalability for signal sending than
what was previously possible**.

* Information about how to enable the message_queue_data=off_heap
  setting can be found in the documentation of the function
  erlang:process_flag/2.

** http://winsh.me/bench/erlang_sig_q/sigq_bench_result.html

Implementation
--------------

The parallel message sending optimization works only on processes with
the message_queue_data=off_heap setting enabled. For processes with
the message_queue_data=off_heap setting enabled, the new optimization
is activated and deactivated on demand based on heuristics to give a
small overhead when the optimization is unnecessary. The optimization
is activated when the contention on the lock for the outer message
queue is high. It is deactivated when the number of enqueued messages
per fetch operation (that fetch messages from the outer message queue
to the inner) is low.

When the optimization is active, the outer message queue has an array
of signal buffers where sending processes enqueue signals. When the
receiving process needs to fetch messages from the outer message
queue, the contents of the non-empty buffers are append to the outer
message queue. Each process is assigned a particular slot in the
buffer array (the process ID is used to hash to a particular
slot). That way, the system can preserve the send order between
messages coming from the same process.
  • Loading branch information
kjellwinblad committed Sep 6, 2021
1 parent 2ef0ca2 commit 658f864
Show file tree
Hide file tree
Showing 15 changed files with 1,552 additions and 53 deletions.
2 changes: 2 additions & 0 deletions erts/emulator/beam/erl_alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,8 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop)
= ERTS_MAGIC_BIN_UNALIGNED_SIZE(sizeof(ErtsMagicIndirectionWord));
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_RECV_MARK_BLK)]
= sizeof(ErtsRecvMarkerBlock);
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_SIGQ_BUFFERS)]
= sizeof(ErtsSignalInQueueBufferArray);

#ifdef HARD_DEBUG
hdbg_init();
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_alloc.types
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ type BINARY_FIND SHORT_LIVED PROCESSES binary_find
type CRASH_DUMP STANDARD SYSTEM crash_dump
type DIST_TRANSCODE SHORT_LIVED SYSTEM dist_transcode_context
type RLA_BLOCK_CNTRS LONG_LIVED SYSTEM release_literal_area_block_counters
type SIGQ_BUFFERS FIXED_SIZE PROCESSES process_signal_queue_buffers

type THR_Q_EL STANDARD SYSTEM thr_q_element
type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element
Expand Down
4 changes: 2 additions & 2 deletions erts/emulator/beam/erl_bif_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
sreds = reds_left;

if (!local_only) {
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
}
Expand Down Expand Up @@ -1218,7 +1218,7 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
}
if (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN) {
ASSERT(locks & ERTS_PROC_LOCK_MAIN);
erts_proc_lock(rp, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_queue_lock(rp);
erts_proc_sig_fetch(rp);
if (c_p->sig_qs.cont) {
erts_proc_unlock(rp, locks|ERTS_PROC_LOCK_MSGQ);
Expand Down
3 changes: 2 additions & 1 deletion erts/emulator/beam/erl_lock_check.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "hard_dbg_mseg", NULL },
{ "perf", NULL },
{ "jit_debug_descriptor", NULL },
{ "erts_mmap", NULL }
{ "erts_mmap", NULL },
{ "proc_sig_queue_buffer", "address" }
};

#define ERTS_LOCK_ORDER_SIZE \
Expand Down
50 changes: 43 additions & 7 deletions erts/emulator/beam/erl_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ erts_queue_dist_message(Process *rcvr,

/* Add messages last in message queue */
static void
queue_messages(Process* receiver,
queue_messages(Process* sender, /* is NULL if the sender is not a local process */
Process* receiver,
ErtsProcLocks receiver_locks,
ErtsMessage* first,
ErtsMessage** last,
Expand All @@ -367,8 +368,17 @@ queue_messages(Process* receiver,
ERTS_LC_ASSERT((erts_proc_lc_my_proc_locks(receiver) & ERTS_PROC_LOCK_MSGQ)
== (receiver_locks & ERTS_PROC_LOCK_MSGQ));

/*
* Try to enqueue to an outer message queue buffer instead of
* directly to the outer message queue
*/
if (erts_proc_sig_queue_try_enqueue_to_buffer(sender, receiver, receiver_locks,
first, last, NULL, len, 0)) {
return;
}

if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
erts_proc_lock(receiver, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_queue_lock(receiver);
locked_msgq = 1;
}

Expand All @@ -390,8 +400,23 @@ queue_messages(Process* receiver,
return;
}

/*
* Install buffers for the outer message if the heuristic
* indicates that this is beneficial. It is best to do this as
* soon as possible to avoid as much contention as possible.
*/
erts_proc_sig_queue_maybe_install_buffers(receiver, state);

if (last == &first->next) {
ASSERT(len == 1);
if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) {
/*
* Flush outer signal queue buffers, if such buffers are
* installed, to ensure that messages from the same
* process cannot be reordered.
*/
erts_proc_sig_queue_flush_buffers(receiver);
}
LINK_MESSAGE(receiver, first);
}
else {
Expand Down Expand Up @@ -446,7 +471,7 @@ erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks,
ERL_MESSAGE_TERM(mp) = msg;
ERL_MESSAGE_FROM(mp) = from;
ERL_MESSAGE_TOKEN(mp) = am_undefined;
queue_messages(receiver, receiver_locks, mp, &mp->next, 1);
queue_messages(NULL, receiver, receiver_locks, mp, &mp->next, 1);
}

/**
Expand All @@ -463,7 +488,7 @@ erts_queue_message_token(Process* receiver, ErtsProcLocks receiver_locks,
ERL_MESSAGE_TERM(mp) = msg;
ERL_MESSAGE_FROM(mp) = from;
ERL_MESSAGE_TOKEN(mp) = token;
queue_messages(receiver, receiver_locks, mp, &mp->next, 1);
queue_messages(NULL, receiver, receiver_locks, mp, &mp->next, 1);
}


Expand All @@ -484,7 +509,7 @@ erts_queue_proc_message(Process* sender,
{
ERL_MESSAGE_TERM(mp) = msg;
ERL_MESSAGE_FROM(mp) = sender->common.id;
queue_messages(receiver, receiver_locks,
queue_messages(sender, receiver, receiver_locks,
prepend_pending_sig_maybe(sender, receiver, mp),
&mp->next, 1);
}
Expand All @@ -495,7 +520,7 @@ erts_queue_proc_messages(Process* sender,
Process* receiver, ErtsProcLocks receiver_locks,
ErtsMessage* first, ErtsMessage** last, Uint len)
{
queue_messages(receiver, receiver_locks,
queue_messages(sender, receiver, receiver_locks,
prepend_pending_sig_maybe(sender, receiver, first),
last, len);
}
Expand Down Expand Up @@ -988,8 +1013,19 @@ erts_change_message_queue_management(Process *c_p, Eterm new_state)
case am_off_heap:
break;
case am_on_heap:
c_p->sig_qs.flags |= FS_ON_HEAP_MSGQ;
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
/*
* The flags are changed while holding the
* ERTS_PROC_LOCK_MSGQ lock so that it is garanteed that
* there are no messages in buffers if (c_p->sig_qs.flags
* & FS_ON_HEAP_MSGQ) and the ERTS_PROC_LOCK_MSGQ is held.
*/
erts_proc_sig_queue_flush_and_deinstall_buffers(c_p);

c_p->sig_qs.flags |= FS_ON_HEAP_MSGQ;
c_p->sig_qs.flags &= ~FS_OFF_HEAP_MSGQ;

erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
/*
* We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ
* if a off heap change is ongoing. It will be adjusted
Expand Down
62 changes: 62 additions & 0 deletions erts/emulator/beam/erl_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@
#define ERTS_MSG_COPY_WORDS_PER_REDUCTION 64
#endif

/* The number of buffers have to be 64 or less because we currenlty
use a single word to implement a bitset with information about
non-empty buffers */
#ifdef DEBUG
#define ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS 64
#define ERTS_PROC_SIG_INQ_BUFFERED_CONTENTION_INSTALL_LIMIT 250
#define ERTS_PROC_SIG_INQ_BUFFERED_ALWAYS_TURN_ON 1
#define ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE 2
#define ERTS_PROC_SIG_INQ_BUFFERED_MIN_NO_ENQUEUES_TO_KEEP \
(ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE + \
ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE / 2)
#else
#define ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS 64
#define ERTS_PROC_SIG_INQ_BUFFERED_CONTENTION_INSTALL_LIMIT 50
#define ERTS_PROC_SIG_INQ_BUFFERED_ALWAYS_TURN_ON 0
#define ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE 8192
/* At least 1.5 enqueues per flush all op */
#define ERTS_PROC_SIG_INQ_BUFFERED_MIN_NO_ENQUEUES_TO_KEEP \
(ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE + \
ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE / 2)
#endif

struct proc_bin;
struct external_thing_;

Expand Down Expand Up @@ -340,6 +362,46 @@ typedef struct {
#endif
} ErtsSignalInQueue;

typedef union {
struct ___ErtsSignalInQueueBufferFields {
erts_mtx_t lock;
/*
* Boolean value indicateing if the buffer is alive. An
* enqueue attempt to a dead buffer has to be canceled
*/
int alive;
/*
* The number of enqueues that has been performed to this
* buffer. This value is used to decide if we should adapt
* back to an unbuffered state
*/
Uint nr_of_enqueues;
ErtsSignalInQueue queue;
} b;
byte align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(struct ___ErtsSignalInQueueBufferFields))];
} ErtsSignalInQueueBuffer;

#if ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS > 64
#error The data structure holding information about which slots that are non-empty (the nonempty_slots field in the struct below) needs to be changed (it currently only supports up to 64 slots)
#endif

typedef struct {
ErtsSignalInQueueBuffer slots[ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS];
ErtsThrPrgrLaterOp free_item;
erts_atomic64_t nonempty_slots;
erts_atomic64_t nonmsg_slots;
/*
* dirty_refc is incremented by dirty schedulers that access the
* buffer array to prevent deallocation while they are accessing
* the buffer array. This is needed since dirty schedulers are not
* part of the thread progress system.
*/
erts_atomic64_t dirty_refc;
Uint nr_of_rounds;
Uint nr_of_enqueues;
int alive;
} ErtsSignalInQueueBufferArray;

typedef struct erl_trace_message_queue__ {
struct erl_trace_message_queue__ *next; /* point to the next receiver */
Eterm receiver;
Expand Down
Loading

0 comments on commit 658f864

Please sign in to comment.