Skip to content

Commit

Permalink
job: Add job_drain()
Browse files Browse the repository at this point in the history
block_job_drain() contains a blk_drain() call which cannot be moved to
Job, so add a new JobDriver callback JobDriver.drain which has a common
implementation for all BlockJobs. In addition to this we keep the
existing BlockJobDriver.drain callback that is called by the common
drain implementation for all block jobs.

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Max Reitz <mreitz@redhat.com>
  • Loading branch information
kevmw committed May 23, 2018
1 parent 004e95d commit b69f777
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 10 deletions.
1 change: 1 addition & 0 deletions block/backup.c
Expand Up @@ -529,6 +529,7 @@ static const BlockJobDriver backup_job_driver = {
.job_type = JOB_TYPE_BACKUP,
.free = block_job_free,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.start = backup_run,
.commit = backup_commit,
.abort = backup_abort,
Expand Down
1 change: 1 addition & 0 deletions block/commit.c
Expand Up @@ -221,6 +221,7 @@ static const BlockJobDriver commit_job_driver = {
.job_type = JOB_TYPE_COMMIT,
.free = block_job_free,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.start = commit_run,
},
};
Expand Down
2 changes: 2 additions & 0 deletions block/mirror.c
Expand Up @@ -992,6 +992,7 @@ static const BlockJobDriver mirror_job_driver = {
.job_type = JOB_TYPE_MIRROR,
.free = block_job_free,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.start = mirror_run,
.pause = mirror_pause,
},
Expand All @@ -1006,6 +1007,7 @@ static const BlockJobDriver commit_active_job_driver = {
.job_type = JOB_TYPE_COMMIT,
.free = block_job_free,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.start = mirror_run,
.pause = mirror_pause,
},
Expand Down
1 change: 1 addition & 0 deletions block/stream.c
Expand Up @@ -215,6 +215,7 @@ static const BlockJobDriver stream_job_driver = {
.free = block_job_free,
.start = stream_run,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
},
};

Expand Down
20 changes: 10 additions & 10 deletions blockjob.c
Expand Up @@ -169,14 +169,13 @@ static void block_job_attached_aio_context(AioContext *new_context,
job_resume(&job->job);
}

static void block_job_drain(BlockJob *job)
void block_job_drain(Job *job)
{
/* If job is !job->job.busy this kicks it into the next pause point. */
block_job_enter(job);
BlockJob *bjob = container_of(job, BlockJob, job);

blk_drain(job->blk);
if (job->driver->drain) {
job->driver->drain(job);
blk_drain(bjob->blk);
if (bjob->driver->drain) {
bjob->driver->drain(bjob);
}
}

Expand All @@ -190,7 +189,7 @@ static void block_job_detach_aio_context(void *opaque)
job_pause(&job->job);

while (!job->job.paused && !job_is_completed(&job->job)) {
block_job_drain(job);
job_drain(&job->job);
}

job->job.aio_context = NULL;
Expand Down Expand Up @@ -327,11 +326,11 @@ static int block_job_finish_sync(BlockJob *job,
job_unref(&job->job);
return -EBUSY;
}
/* block_job_drain calls block_job_enter, and it should be enough to
* induce progress until the job completes or moves to the main thread.
/* job_drain calls job_enter, and it should be enough to induce progress
* until the job completes or moves to the main thread.
*/
while (!job->job.deferred_to_main_loop && !job_is_completed(&job->job)) {
block_job_drain(job);
job_drain(&job->job);
}
while (!job_is_completed(&job->job)) {
aio_poll(qemu_get_aio_context(), true);
Expand Down Expand Up @@ -713,6 +712,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
assert(is_block_job(&job->job));
assert(job->job.driver->free == &block_job_free);
assert(job->job.driver->user_resume == &block_job_user_resume);
assert(job->job.driver->drain == &block_job_drain);

job->driver = driver;
job->blk = blk;
Expand Down
12 changes: 12 additions & 0 deletions include/block/blockjob_int.h
Expand Up @@ -65,6 +65,10 @@ struct BlockJobDriver {
* If the callback is not NULL, it will be invoked when the job has to be
* synchronously cancelled or completed; it should drain BlockDriverStates
* as required to ensure progress.
*
* Block jobs must use the default implementation for job_driver.drain,
* which will in turn call this callback after doing generic block job
* stuff.
*/
void (*drain)(BlockJob *job);
};
Expand Down Expand Up @@ -111,6 +115,14 @@ void block_job_free(Job *job);
*/
void block_job_user_resume(Job *job);

/**
* block_job_drain:
* Callback to be used for JobDriver.drain in all block jobs. Drains the main
* block node associated with the block jobs and calls BlockJobDriver.drain for
* job-specific actions.
*/
void block_job_drain(Job *job);

/**
* block_job_yield:
* @job: The job that calls the function.
Expand Down
13 changes: 13 additions & 0 deletions include/qemu/job.h
Expand Up @@ -167,6 +167,13 @@ struct JobDriver {
*/
void (*user_resume)(Job *job);

/*
* If the callback is not NULL, it will be invoked when the job has to be
* synchronously cancelled or completed; it should drain any activities
* as required to ensure progress.
*/
void (*drain)(Job *job);

/**
* If the callback is not NULL, it will be invoked when all the jobs
* belonging to the same transaction complete; or upon this job's
Expand Down Expand Up @@ -325,6 +332,12 @@ bool job_user_paused(Job *job);
*/
void job_user_resume(Job *job, Error **errp);

/*
* Drain any activities as required to ensure progress. This can be called in a
* loop to synchronously complete a job.
*/
void job_drain(Job *job);

/**
* Get the next element from the list of block jobs after @job, or the
* first one if @job is %NULL.
Expand Down
11 changes: 11 additions & 0 deletions job.c
Expand Up @@ -367,6 +367,17 @@ void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
job_pause_point(job);
}

void job_drain(Job *job)
{
/* If job is !busy this kicks it into the next pause point. */
job_enter(job);

if (job->driver->drain) {
job->driver->drain(job);
}
}


/**
* All jobs must allow a pause point before entering their job proper. This
* ensures that jobs can be paused prior to being started, then resumed later.
Expand Down
1 change: 1 addition & 0 deletions tests/test-bdrv-drain.c
Expand Up @@ -525,6 +525,7 @@ BlockJobDriver test_job_driver = {
.instance_size = sizeof(TestBlockJob),
.free = block_job_free,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.start = test_job_start,
},
.complete = test_job_complete,
Expand Down
1 change: 1 addition & 0 deletions tests/test-blockjob-txn.c
Expand Up @@ -79,6 +79,7 @@ static const BlockJobDriver test_block_job_driver = {
.instance_size = sizeof(TestBlockJob),
.free = block_job_free,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.start = test_block_job_run,
},
};
Expand Down
2 changes: 2 additions & 0 deletions tests/test-blockjob.c
Expand Up @@ -21,6 +21,7 @@ static const BlockJobDriver test_block_job_driver = {
.instance_size = sizeof(BlockJob),
.free = block_job_free,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
},
};

Expand Down Expand Up @@ -201,6 +202,7 @@ static const BlockJobDriver test_cancel_driver = {
.instance_size = sizeof(CancelJob),
.free = block_job_free,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.start = cancel_job_start,
},
.complete = cancel_job_complete,
Expand Down

0 comments on commit b69f777

Please sign in to comment.