Skip to content

Commit

Permalink
smp: allow more than 128 in-flight operations on core-to-core queue
Browse files Browse the repository at this point in the history
The 128-message limit on in-flight operations can lead to ABA deadlocks;
for example:

  A: send 128 requests to B
  B: while processing each of A's request, send a request back to A (processing
     A's request depends on completion of the B->A request).
  A: while processing the B->A requests, send a request back to B.

We deadlock. because A cannot send the last request to B.

See #1088 for a real-life example.

Fix by lifting the restriction; allow any number of requests to be
executed concurrently.  This requires the application to add its internal
limits, otherwise, if all shards simulataneously send many messages to a
single shard, that shard's memory can be exhausted.

Message-Id: <1461497600-20495-1-git-send-email-avi@scylladb.com>
  • Loading branch information
avikivity committed Apr 25, 2016
1 parent fa11cab commit 15a92cf
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions core/reactor.cc
Expand Up @@ -2047,14 +2047,13 @@ smp_message_queue::smp_message_queue(reactor* from, reactor* to)
}

void smp_message_queue::move_pending() {
auto queue_room = queue_length - _current_queue_length;
auto nr = std::min(queue_room, _tx.a.pending_fifo.size());
if (!nr) {
auto begin = _tx.a.pending_fifo.cbegin();
auto end = _tx.a.pending_fifo.cend();
end = _pending.push(begin, end);
if (begin == end) {
return;
}
auto begin = _tx.a.pending_fifo.begin();
auto end = begin + nr;
_pending.push(begin, end);
auto nr = end - begin;
_pending.maybe_wakeup();
_tx.a.pending_fifo.erase(begin, end);
_current_queue_length += nr;
Expand All @@ -2078,9 +2077,14 @@ void smp_message_queue::respond(work_item* item) {

void smp_message_queue::flush_response_batch() {
if (!_completed_fifo.empty()) {
_completed.push(_completed_fifo.begin(), _completed_fifo.end());
auto begin = _completed_fifo.cbegin();
auto end = _completed_fifo.cend();
end = _completed.push(begin, end);
if (begin == end) {
return;
}
_completed.maybe_wakeup();
_completed_fifo.clear();
_completed_fifo.erase(begin, end);
}
}

Expand Down

0 comments on commit 15a92cf

Please sign in to comment.