Skip to content

Commit

Permalink
cross-shard-barrier: Capture shared barrier in complete
Browse files Browse the repository at this point in the history
When cross-shard barrier is abort()-ed it spawns a background fiber
that will wake-up other shards (if they are sleeping) with exception.

This fiber is implicitly waited by the owning sharded service .stop,
because barrier usage is like this:

    sharded<service> s;
    co_await s.invoke_on_all([] {
        ...
        barrier.abort();
    });
    ...
    co_await s.stop();

If abort happens, the invoke_on_all() will only resolve _after_ it
queues up the waking lambdas into smp queues, thus the subseqent stop
will queue its stopping lambdas after barrier's ones.

However, in debug mode the queue can be shuffled, so the owning service
can suddenly be freed from under the barrier's feet causing use after
free. Fortunately, this can be easily fixed by capturing the shared
pointer on the shared barrier instead of a regular pointer on the
shard-local barrier.

fixes: scylladb#11303

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
  • Loading branch information
xemul committed Sep 15, 2022
1 parent 77467bc commit 2078b05
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
15 changes: 15 additions & 0 deletions test/unit/cross_shard_barrier_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ class worker : public seastar::peering_sharded_service<worker> {
}
}

future<> error() {
_barrier.abort();
return make_ready_future<>();
}

unsigned get_phase() const noexcept { return _phase.load(); }
};

Expand Down Expand Up @@ -115,6 +120,16 @@ int main(int argc, char **argv) {
}
w.stop().get();
}

std::vector<int> count(64);
parallel_for_each(count, [] (auto& cnt) -> future<> {
std::vector<sharded<worker>> w(32);
co_await parallel_for_each(w, [] (auto &sw) -> future<> {
co_await sw.start(utils::cross_shard_barrier());
co_await sw.invoke_on_all(&worker::error);
co_await sw.stop();
});
}).get();
});
});
}
4 changes: 2 additions & 2 deletions utils/cross-shard-barrier.hh
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ private:
future<> complete() {
_b->counter.fetch_add(smp::count);
bool alive = _b->alive.load(std::memory_order_relaxed);
return smp::invoke_on_all([this, sid = this_shard_id(), alive] {
return smp::invoke_on_all([b = _b, sid = this_shard_id(), alive] {
if (this_shard_id() != sid) {
std::optional<promise<>>& w = _b->wakeup[this_shard_id()];
std::optional<promise<>>& w = b->wakeup[this_shard_id()];
if (alive) {
assert(w.has_value());
w->set_value();
Expand Down

0 comments on commit 2078b05

Please sign in to comment.