Skip to content

Commit

Permalink
io-wq: have manager wait for all workers to exit
Browse files Browse the repository at this point in the history
Instead of having to wait separately on workers and manager, just have
the manager wait on the workers. We use an atomic_t for the reference
here, as we need to start at 0 and allow increment from that. Since the
number of workers is naturally capped by the allowed nr of processes,
and that uses an int, there is no risk of overflow.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
  • Loading branch information
axboe committed Mar 4, 2021
1 parent 65d4302 commit fb3a1f6
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions fs/io-wq.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ struct io_wq {
refcount_t refs;
struct completion done;

atomic_t worker_refs;
struct completion worker_done;

struct hlist_node cpuhp_node;

pid_t task_pid;
Expand Down Expand Up @@ -189,7 +192,8 @@ static void io_worker_exit(struct io_worker *worker)
raw_spin_unlock_irq(&wqe->lock);

kfree_rcu(worker, rcu);
io_wq_put(wqe->wq);
if (atomic_dec_and_test(&wqe->wq->worker_refs))
complete(&wqe->wq->worker_done);
}

static inline bool io_wqe_run_queue(struct io_wqe *wqe)
Expand Down Expand Up @@ -648,14 +652,15 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
init_completion(&worker->ref_done);
init_completion(&worker->started);

refcount_inc(&wq->refs);
atomic_inc(&wq->worker_refs);

if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker);
else
pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) {
io_wq_put(wq);
if (atomic_dec_and_test(&wq->worker_refs))
complete(&wq->worker_done);
kfree(worker);
return false;
}
Expand Down Expand Up @@ -736,6 +741,7 @@ static int io_wq_manager(void *data)
{
struct io_wq *wq = data;
char buf[TASK_COMM_LEN];
int node;

sprintf(buf, "iou-mgr-%d", wq->task_pid);
set_task_comm(current, buf);
Expand All @@ -753,6 +759,15 @@ static int io_wq_manager(void *data)
} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));

io_wq_check_workers(wq);

rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();

/* we might not ever have created any workers */
if (atomic_read(&wq->worker_refs))
wait_for_completion(&wq->worker_done);
wq->manager = NULL;
io_wq_put(wq);
do_exit(0);
Expand Down Expand Up @@ -796,6 +811,7 @@ static int io_wq_fork_manager(struct io_wq *wq)
if (wq->manager)
return 0;

reinit_completion(&wq->worker_done);
clear_bit(IO_WQ_BIT_EXIT, &wq->state);
refcount_inc(&wq->refs);
current->flags |= PF_IO_WORKER;
Expand Down Expand Up @@ -1050,6 +1066,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
init_completion(&wq->done);
refcount_set(&wq->refs, 1);

init_completion(&wq->worker_done);
atomic_set(&wq->worker_refs, 0);

ret = io_wq_fork_manager(wq);
if (!ret)
return wq;
Expand Down Expand Up @@ -1077,11 +1096,6 @@ static void io_wq_destroy(struct io_wq *wq)
if (wq->manager)
wake_up_process(wq->manager);

rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();

spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
Expand Down

0 comments on commit fb3a1f6

Please sign in to comment.