Skip to content

Commit

Permalink
io-wq: fix race between adding work and activating a free worker
Browse files Browse the repository at this point in the history
commit 94ffb0a upstream.

The attempt to find and activate a free worker for new work is currently
combined with creating a new one if we don't find one, but that opens
io-wq up to a race where the worker that is found and activated can
put itself to sleep without knowing that it has been selected to perform
this new work.

Fix this by moving the activation into where we add the new work item,
then we can retain it within the wqe->lock scope and elimiate the race
with the worker itself checking inside the lock, but sleeping outside of
it.

Cc: stable@vger.kernel.org
Reported-by: Andres Freund <andres@anarazel.de>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
  • Loading branch information
axboe authored and gregkh committed Sep 18, 2021
1 parent 08c87b2 commit b7a2335
Showing 1 changed file with 24 additions and 26 deletions.
50 changes: 24 additions & 26 deletions fs/io-wq.c
Expand Up @@ -237,9 +237,9 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
* We need a worker. If we find a free one, we're good. If not, and we're
* below the max number of workers, create one.
*/
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
bool ret;
bool do_create = false, first = false;

/*
* Most likely an attempt to queue unbounded work on an io_wq that
Expand All @@ -248,25 +248,18 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
if (unlikely(!acct->max_workers))
pr_warn_once("io-wq is not configured for unbound workers");

rcu_read_lock();
ret = io_wqe_activate_free_worker(wqe);
rcu_read_unlock();

if (!ret) {
bool do_create = false, first = false;

raw_spin_lock_irq(&wqe->lock);
if (acct->nr_workers < acct->max_workers) {
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
if (!acct->nr_workers)
first = true;
acct->nr_workers++;
do_create = true;
}
raw_spin_unlock_irq(&wqe->lock);
if (do_create)
create_io_worker(wqe->wq, wqe, acct->index, first);
raw_spin_lock_irq(&wqe->lock);
if (acct->nr_workers < acct->max_workers) {
if (!acct->nr_workers)
first = true;
acct->nr_workers++;
do_create = true;
}
raw_spin_unlock_irq(&wqe->lock);
if (do_create) {
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
create_io_worker(wqe->wq, wqe, acct->index, first);
}
}

Expand Down Expand Up @@ -798,7 +791,8 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
bool do_wake;
unsigned work_flags = work->flags;
bool do_create;
unsigned long flags;

/*
Expand All @@ -814,12 +808,16 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
raw_spin_lock_irqsave(&wqe->lock, flags);
io_wqe_insert_work(wqe, work);
wqe->flags &= ~IO_WQE_FLAG_STALLED;
do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running);

rcu_read_lock();
do_create = !io_wqe_activate_free_worker(wqe);
rcu_read_unlock();

raw_spin_unlock_irqrestore(&wqe->lock, flags);

if (do_wake)
io_wqe_wake_worker(wqe, acct);
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running)))
io_wqe_create_worker(wqe, acct);
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
Expand Down

0 comments on commit b7a2335

Please sign in to comment.