Skip to content

Commit

Permalink
job.c: enable job lock/unlock and remove Aiocontext locks
Browse files Browse the repository at this point in the history
Change the job_{lock/unlock} and macros to use job_mutex.

Now that they are not nop anymore, remove the aiocontext
to avoid deadlocks.

Therefore:
- when possible, remove completely the aiocontext lock/unlock pair
- if it is used by some other function too, reduce the locking
  section as much as possible, leaving the job API outside.
- change AIO_WAIT_WHILE in AIO_WAIT_WHILE_UNLOCKED, since we
  are not using the aiocontext lock anymore

The only functions that still need the aiocontext lock are:
- the JobDriver callbacks, already documented in job.h
- job_cancel_sync() in replication.c is called with aio_context_lock
  taken, but now job is using AIO_WAIT_WHILE_UNLOCKED so we need to
  release the lock.

Reduce the locking section to only cover the callback invocation
and document the functions that take the AioContext lock,
to avoid taking it twice.

Also remove real_job_{lock/unlock}, as they are replaced by the
public functions.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
Message-Id: <20220926093214.506243-19-eesposit@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
  • Loading branch information
esposem authored and kevmw committed Oct 7, 2022
1 parent 2fc3bdc commit 6f592e5
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 203 deletions.
2 changes: 2 additions & 0 deletions block/replication.c
Expand Up @@ -727,7 +727,9 @@ static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
* disk, secondary disk in backup_job_completed().
*/
if (s->backup_job) {
aio_context_release(aio_context);
job_cancel_sync(&s->backup_job->job, true);
aio_context_acquire(aio_context);
}

if (!failover) {
Expand Down
72 changes: 10 additions & 62 deletions blockdev.c
Expand Up @@ -155,12 +155,7 @@ void blockdev_mark_auto_del(BlockBackend *blk)
for (job = block_job_next_locked(NULL); job;
job = block_job_next_locked(job)) {
if (block_job_has_bdrv(job, blk_bs(blk))) {
AioContext *aio_context = job->job.aio_context;
aio_context_acquire(aio_context);

job_cancel_locked(&job->job, false);

aio_context_release(aio_context);
}
}

Expand Down Expand Up @@ -1847,14 +1842,7 @@ static void drive_backup_abort(BlkActionState *common)
DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);

if (state->job) {
AioContext *aio_context;

aio_context = bdrv_get_aio_context(state->bs);
aio_context_acquire(aio_context);

job_cancel_sync(&state->job->job, true);

aio_context_release(aio_context);
}
}

Expand Down Expand Up @@ -1948,14 +1936,7 @@ static void blockdev_backup_abort(BlkActionState *common)
BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);

if (state->job) {
AioContext *aio_context;

aio_context = bdrv_get_aio_context(state->bs);
aio_context_acquire(aio_context);

job_cancel_sync(&state->job->job, true);

aio_context_release(aio_context);
}
}

Expand Down Expand Up @@ -3317,19 +3298,14 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
}

/*
* Get a block job using its ID and acquire its AioContext.
* Called with job_mutex held.
* Get a block job using its ID. Called with job_mutex held.
*/
static BlockJob *find_block_job_locked(const char *id,
AioContext **aio_context,
Error **errp)
static BlockJob *find_block_job_locked(const char *id, Error **errp)
{
BlockJob *job;

assert(id != NULL);

*aio_context = NULL;

job = block_job_get_locked(id);

if (!job) {
Expand All @@ -3338,36 +3314,30 @@ static BlockJob *find_block_job_locked(const char *id,
return NULL;
}

*aio_context = block_job_get_aio_context(job);
aio_context_acquire(*aio_context);

return job;
}

void qmp_block_job_set_speed(const char *device, int64_t speed, Error **errp)
{
AioContext *aio_context;
BlockJob *job;

JOB_LOCK_GUARD();
job = find_block_job_locked(device, &aio_context, errp);
job = find_block_job_locked(device, errp);

if (!job) {
return;
}

block_job_set_speed_locked(job, speed, errp);
aio_context_release(aio_context);
}

void qmp_block_job_cancel(const char *device,
bool has_force, bool force, Error **errp)
{
AioContext *aio_context;
BlockJob *job;

JOB_LOCK_GUARD();
job = find_block_job_locked(device, &aio_context, errp);
job = find_block_job_locked(device, errp);

if (!job) {
return;
Expand All @@ -3380,73 +3350,64 @@ void qmp_block_job_cancel(const char *device,
if (job_user_paused_locked(&job->job) && !force) {
error_setg(errp, "The block job for device '%s' is currently paused",
device);
goto out;
return;
}

trace_qmp_block_job_cancel(job);
job_user_cancel_locked(&job->job, force, errp);
out:
aio_context_release(aio_context);
}

void qmp_block_job_pause(const char *device, Error **errp)
{
AioContext *aio_context;
BlockJob *job;

JOB_LOCK_GUARD();
job = find_block_job_locked(device, &aio_context, errp);
job = find_block_job_locked(device, errp);

if (!job) {
return;
}

trace_qmp_block_job_pause(job);
job_user_pause_locked(&job->job, errp);
aio_context_release(aio_context);
}

void qmp_block_job_resume(const char *device, Error **errp)
{
AioContext *aio_context;
BlockJob *job;

JOB_LOCK_GUARD();
job = find_block_job_locked(device, &aio_context, errp);
job = find_block_job_locked(device, errp);

if (!job) {
return;
}

trace_qmp_block_job_resume(job);
job_user_resume_locked(&job->job, errp);
aio_context_release(aio_context);
}

void qmp_block_job_complete(const char *device, Error **errp)
{
AioContext *aio_context;
BlockJob *job;

JOB_LOCK_GUARD();
job = find_block_job_locked(device, &aio_context, errp);
job = find_block_job_locked(device, errp);

if (!job) {
return;
}

trace_qmp_block_job_complete(job);
job_complete_locked(&job->job, errp);
aio_context_release(aio_context);
}

void qmp_block_job_finalize(const char *id, Error **errp)
{
AioContext *aio_context;
BlockJob *job;

JOB_LOCK_GUARD();
job = find_block_job_locked(id, &aio_context, errp);
job = find_block_job_locked(id, errp);

if (!job) {
return;
Expand All @@ -3456,24 +3417,16 @@ void qmp_block_job_finalize(const char *id, Error **errp)
job_ref_locked(&job->job);
job_finalize_locked(&job->job, errp);

/*
* Job's context might have changed via job_finalize (and job_txn_apply
* automatically acquires the new one), so make sure we release the correct
* one.
*/
aio_context = block_job_get_aio_context(job);
job_unref_locked(&job->job);
aio_context_release(aio_context);
}

void qmp_block_job_dismiss(const char *id, Error **errp)
{
AioContext *aio_context;
BlockJob *bjob;
Job *job;

JOB_LOCK_GUARD();
bjob = find_block_job_locked(id, &aio_context, errp);
bjob = find_block_job_locked(id, errp);

if (!bjob) {
return;
Expand All @@ -3482,7 +3435,6 @@ void qmp_block_job_dismiss(const char *id, Error **errp)
trace_qmp_block_job_dismiss(bjob);
job = &bjob->job;
job_dismiss_locked(&job, errp);
aio_context_release(aio_context);
}

void qmp_change_backing_file(const char *device,
Expand Down Expand Up @@ -3764,15 +3716,11 @@ BlockJobInfoList *qmp_query_block_jobs(Error **errp)
for (job = block_job_next_locked(NULL); job;
job = block_job_next_locked(job)) {
BlockJobInfo *value;
AioContext *aio_context;

if (block_job_is_internal(job)) {
continue;
}
aio_context = block_job_get_aio_context(job);
aio_context_acquire(aio_context);
value = block_job_query_locked(job, errp);
aio_context_release(aio_context);
if (!value) {
qapi_free_BlockJobInfoList(head);
return NULL;
Expand Down
17 changes: 9 additions & 8 deletions include/qemu/job.h
Expand Up @@ -88,7 +88,7 @@ typedef struct Job {
AioContext *aio_context;


/** Protected by AioContext lock */
/** Protected by job_mutex */

/** Reference count of the block job */
int refcnt;
Expand All @@ -111,7 +111,7 @@ typedef struct Job {
/**
* Set to false by the job while the coroutine has yielded and may be
* re-entered by job_enter(). There may still be I/O or event loop activity
* pending. Accessed under block_job_mutex (in blockjob.c).
* pending. Accessed under job_mutex.
*
* When the job is deferred to the main loop, busy is true as long as the
* bottom half is still pending.
Expand Down Expand Up @@ -346,9 +346,9 @@ typedef enum JobCreateFlags {

extern QemuMutex job_mutex;

#define JOB_LOCK_GUARD() /* QEMU_LOCK_GUARD(&job_mutex) */
#define JOB_LOCK_GUARD() QEMU_LOCK_GUARD(&job_mutex)

#define WITH_JOB_LOCK_GUARD() /* WITH_QEMU_LOCK_GUARD(&job_mutex) */
#define WITH_JOB_LOCK_GUARD() WITH_QEMU_LOCK_GUARD(&job_mutex)

/**
* job_lock:
Expand Down Expand Up @@ -422,6 +422,8 @@ void job_ref_locked(Job *job);
/**
* Release a reference that was previously acquired with job_ref() or
* job_create(). If it's the last reference to the object, it will be freed.
*
* Takes AioContext lock internally to invoke a job->driver callback.
*/
void job_unref(Job *job);

Expand Down Expand Up @@ -696,7 +698,7 @@ void job_user_cancel_locked(Job *job, bool force, Error **errp);
* Returns the return value from the job if the job actually completed
* during the call, or -ECANCELED if it was canceled.
*
* Callers must hold the AioContext lock of job->aio_context.
* Called with job_lock *not* held.
*/
int job_cancel_sync(Job *job, bool force);

Expand All @@ -721,8 +723,7 @@ void job_cancel_sync_all(void);
* function).
*
* Returns the return value from the job.
*
* Callers must hold the AioContext lock of job->aio_context.
* Called with job_lock *not* held.
*/
int job_complete_sync(Job *job, Error **errp);

Expand Down Expand Up @@ -758,7 +759,7 @@ void job_dismiss_locked(Job **job, Error **errp);
* Returns 0 if the job is successfully completed, -ECANCELED if the job was
* cancelled before completing, and -errno in other error cases.
*
* Callers must hold the AioContext lock of job->aio_context.
* Called with job_lock *not* held.
*/
int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
Error **errp);
Expand Down

0 comments on commit 6f592e5

Please sign in to comment.