Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/juanquintela/tags/migration/201…
Browse files Browse the repository at this point in the history
…80822-1' into staging

migration/next for 20180822

# gpg: Signature made Wed 22 Aug 2018 12:07:59 BST
# gpg:                using RSA key F487EF185872D723
# gpg: Good signature from "Juan Quintela <quintela@redhat.com>"
# gpg:                 aka "Juan Quintela <quintela@trasno.org>"
# Primary key fingerprint: 1899 FF8E DEBF 58CC EE03  4B82 F487 EF18 5872 D723

* remotes/juanquintela/tags/migration/20180822-1:
  migration: hold the lock only if it is really needed
  migration: move handle of zero page to the thread
  migration: drop the return value of do_compress_ram_page
  migration: introduce save_zero_page_to_file
  migration: fix counting normal page for compression
  migration: do not wait for free thread
  migration: poll the cm event for destination qemu
  tests/migration-test: Silence the kvm_hv message by default
  migration: implement the shutdown for RDMA QIOChannel
  migration: poll the cm event while wait RDMA work request completion
  migration: invoke qio_channel_yield only when qemu_in_coroutine()
  migration: implement io_set_aio_fd_handler function for RDMA QIOChannel
  migration: Stop rdma yielding during incoming postcopy
  migration: implement bi-directional RDMA QIOChannel
  migration: create a dedicated connection for rdma return path
  migration: disable RDMA WRITE after postcopy started
  migrate/cpu-throttle: Add max-cpu-throttle migration parameter
  docs/migration: Clarify pre_load in subsections
  migration: Correctly handle subsections with no 'needed' function
  qapi/migration.json: fix the description for "query-migrate" output

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
  • Loading branch information
pm215 committed Aug 24, 2018
2 parents 1dfb85a + ae526e3 commit 54906fe
Show file tree
Hide file tree
Showing 14 changed files with 679 additions and 126 deletions.
15 changes: 9 additions & 6 deletions docs/devel/migration.rst
Expand Up @@ -240,10 +240,13 @@ should succeed even with the data missing. To support this the
subsection can be connected to a device property and from there
to a versioned machine type.

One important note is that the post_load() function is called "after"
loading all subsections, because a newer subsection could change same
value that it uses. A flag, and the combination of pre_load and post_load
can be used to detect whether a subsection was loaded, and to
The 'pre_load' and 'post_load' functions on subsections are only
called if the subsection is loaded.

One important note is that the outer post_load() function is called "after"
loading all subsections, because a newer subsection could change the same
value that it uses. A flag, and the combination of outer pre_load and
post_load can be used to detect whether a subsection was loaded, and to
fall back on default behaviour when the subsection isn't present.

Example:
Expand Down Expand Up @@ -315,8 +318,8 @@ For example:
the property to false.
c) Add a static bool support_foo function that tests the property.
d) Add a subsection with a .needed set to the support_foo function
e) (potentially) Add a pre_load that sets up a default value for 'foo'
to be used if the subsection isn't loaded.
e) (potentially) Add an outer pre_load that sets up a default value
for 'foo' to be used if the subsection isn't loaded.

Now that subsection will not be generated when using an older
machine type and the migration stream will be accepted by older
Expand Down
16 changes: 16 additions & 0 deletions hmp.c
Expand Up @@ -327,6 +327,10 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_THREADS),
params->compress_threads);
assert(params->has_compress_wait_thread);
monitor_printf(mon, "%s: %s\n",
MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD),
params->compress_wait_thread ? "on" : "off");
assert(params->has_decompress_threads);
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_DECOMPRESS_THREADS),
Expand All @@ -339,6 +343,10 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_CPU_THROTTLE_INCREMENT),
params->cpu_throttle_increment);
assert(params->has_max_cpu_throttle);
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_MAX_CPU_THROTTLE),
params->max_cpu_throttle);
assert(params->has_tls_creds);
monitor_printf(mon, "%s: '%s'\n",
MigrationParameter_str(MIGRATION_PARAMETER_TLS_CREDS),
Expand Down Expand Up @@ -1647,6 +1655,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p->has_compress_threads = true;
visit_type_int(v, param, &p->compress_threads, &err);
break;
case MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD:
p->has_compress_wait_thread = true;
visit_type_bool(v, param, &p->compress_wait_thread, &err);
break;
case MIGRATION_PARAMETER_DECOMPRESS_THREADS:
p->has_decompress_threads = true;
visit_type_int(v, param, &p->decompress_threads, &err);
Expand All @@ -1659,6 +1671,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p->has_cpu_throttle_increment = true;
visit_type_int(v, param, &p->cpu_throttle_increment, &err);
break;
case MIGRATION_PARAMETER_MAX_CPU_THROTTLE:
p->has_max_cpu_throttle = true;
visit_type_int(v, param, &p->max_cpu_throttle, &err);
break;
case MIGRATION_PARAMETER_TLS_CREDS:
p->has_tls_creds = true;
p->tls_creds = g_new0(StrOrNull, 1);
Expand Down
1 change: 1 addition & 0 deletions include/qemu/queue.h
Expand Up @@ -341,6 +341,7 @@ struct { \
/*
* Simple queue access methods.
*/
#define QSIMPLEQ_EMPTY_ATOMIC(head) (atomic_read(&((head)->sqh_first)) == NULL)
#define QSIMPLEQ_EMPTY(head) ((head)->sqh_first == NULL)
#define QSIMPLEQ_FIRST(head) ((head)->sqh_first)
#define QSIMPLEQ_NEXT(elm, field) ((elm)->field.sqe_next)
Expand Down
2 changes: 2 additions & 0 deletions migration/colo.c
Expand Up @@ -534,6 +534,7 @@ void *colo_process_incoming_thread(void *opaque)
uint64_t value;
Error *local_err = NULL;

rcu_register_thread();
qemu_sem_init(&mis->colo_incoming_sem, 0);

migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
Expand Down Expand Up @@ -666,5 +667,6 @@ void *colo_process_incoming_thread(void *opaque)
}
migration_incoming_exit_colo();

rcu_unregister_thread();
return NULL;
}
49 changes: 47 additions & 2 deletions migration/migration.c
Expand Up @@ -71,6 +71,7 @@
/* Define default autoconverge cpu throttle migration parameters */
#define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20
#define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10
#define DEFAULT_MIGRATE_MAX_CPU_THROTTLE 99

/* Migration XBZRLE default cache size */
#define DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE (64 * 1024 * 1024)
Expand Down Expand Up @@ -389,6 +390,7 @@ static void process_incoming_migration_co(void *opaque)
int ret;

assert(mis->from_src_file);
mis->migration_incoming_co = qemu_coroutine_self();
mis->largest_page_size = qemu_ram_pagesize_largest();
postcopy_state_set(POSTCOPY_INCOMING_NONE);
migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
Expand Down Expand Up @@ -418,7 +420,6 @@ static void process_incoming_migration_co(void *opaque)

/* we get COLO info, and know if we are in COLO mode */
if (!ret && migration_incoming_enable_colo()) {
mis->migration_incoming_co = qemu_coroutine_self();
qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming",
colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE);
mis->have_colo_incoming_thread = true;
Expand All @@ -442,6 +443,7 @@ static void process_incoming_migration_co(void *opaque)
}
mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
qemu_bh_schedule(mis->bh);
mis->migration_incoming_co = NULL;
}

static void migration_incoming_setup(QEMUFile *f)
Expand Down Expand Up @@ -671,6 +673,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->compress_level = s->parameters.compress_level;
params->has_compress_threads = true;
params->compress_threads = s->parameters.compress_threads;
params->has_compress_wait_thread = true;
params->compress_wait_thread = s->parameters.compress_wait_thread;
params->has_decompress_threads = true;
params->decompress_threads = s->parameters.decompress_threads;
params->has_cpu_throttle_initial = true;
Expand All @@ -697,6 +701,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
params->has_max_postcopy_bandwidth = true;
params->max_postcopy_bandwidth = s->parameters.max_postcopy_bandwidth;
params->has_max_cpu_throttle = true;
params->max_cpu_throttle = s->parameters.max_cpu_throttle;

return params;
}
Expand Down Expand Up @@ -1043,6 +1049,15 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
return false;
}

if (params->has_max_cpu_throttle &&
(params->max_cpu_throttle < params->cpu_throttle_initial ||
params->max_cpu_throttle > 99)) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
"max_cpu_throttle",
"an integer in the range of cpu_throttle_initial to 99");
return false;
}

return true;
}

Expand All @@ -1061,6 +1076,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
dest->compress_threads = params->compress_threads;
}

if (params->has_compress_wait_thread) {
dest->compress_wait_thread = params->compress_wait_thread;
}

if (params->has_decompress_threads) {
dest->decompress_threads = params->decompress_threads;
}
Expand Down Expand Up @@ -1110,6 +1129,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
if (params->has_max_postcopy_bandwidth) {
dest->max_postcopy_bandwidth = params->max_postcopy_bandwidth;
}
if (params->has_max_cpu_throttle) {
dest->max_cpu_throttle = params->max_cpu_throttle;
}
}

static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
Expand All @@ -1126,6 +1148,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
s->parameters.compress_threads = params->compress_threads;
}

if (params->has_compress_wait_thread) {
s->parameters.compress_wait_thread = params->compress_wait_thread;
}

if (params->has_decompress_threads) {
s->parameters.decompress_threads = params->decompress_threads;
}
Expand Down Expand Up @@ -1185,6 +1211,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
if (params->has_max_postcopy_bandwidth) {
s->parameters.max_postcopy_bandwidth = params->max_postcopy_bandwidth;
}
if (params->has_max_cpu_throttle) {
s->parameters.max_cpu_throttle = params->max_cpu_throttle;
}
}

void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
Expand Down Expand Up @@ -1871,6 +1900,15 @@ int migrate_compress_threads(void)
return s->parameters.compress_threads;
}

int migrate_compress_wait_thread(void)
{
MigrationState *s;

s = migrate_get_current();

return s->parameters.compress_wait_thread;
}

int migrate_decompress_threads(void)
{
MigrationState *s;
Expand Down Expand Up @@ -1962,7 +2000,6 @@ static int64_t migrate_max_postcopy_bandwidth(void)
return s->parameters.max_postcopy_bandwidth;
}


bool migrate_use_block(void)
{
MigrationState *s;
Expand Down Expand Up @@ -2104,6 +2141,7 @@ static void *source_return_path_thread(void *opaque)
int res;

trace_source_return_path_thread_entry();
rcu_register_thread();

retry:
while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
Expand Down Expand Up @@ -2243,6 +2281,7 @@ static void *source_return_path_thread(void *opaque)
trace_source_return_path_thread_end();
ms->rp_state.from_dst_file = NULL;
qemu_fclose(rp);
rcu_unregister_thread();
return NULL;
}

Expand Down Expand Up @@ -3131,6 +3170,8 @@ static Property migration_properties[] = {
DEFINE_PROP_UINT8("x-compress-threads", MigrationState,
parameters.compress_threads,
DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState,
parameters.compress_wait_thread, true),
DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
parameters.decompress_threads,
DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
Expand Down Expand Up @@ -3160,6 +3201,9 @@ static Property migration_properties[] = {
DEFINE_PROP_SIZE("max-postcopy-bandwidth", MigrationState,
parameters.max_postcopy_bandwidth,
DEFAULT_MIGRATE_MAX_POSTCOPY_BANDWIDTH),
DEFINE_PROP_UINT8("max-cpu-throttle", MigrationState,
parameters.max_cpu_throttle,
DEFAULT_MIGRATE_MAX_CPU_THROTTLE),

/* Migration capabilities */
DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
Expand Down Expand Up @@ -3230,6 +3274,7 @@ static void migration_instance_init(Object *obj)
params->has_x_multifd_page_count = true;
params->has_xbzrle_cache_size = true;
params->has_max_postcopy_bandwidth = true;
params->has_max_cpu_throttle = true;

qemu_sem_init(&ms->postcopy_pause_sem, 0);
qemu_sem_init(&ms->postcopy_pause_rp_sem, 0);
Expand Down
2 changes: 2 additions & 0 deletions migration/migration.h
Expand Up @@ -266,11 +266,13 @@ bool migrate_colo_enabled(void);

bool migrate_use_block(void);
bool migrate_use_block_incremental(void);
int migrate_max_cpu_throttle(void);
bool migrate_use_return_path(void);

bool migrate_use_compression(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
int migrate_compress_wait_thread(void);
int migrate_decompress_threads(void);
bool migrate_use_events(void);
bool migrate_postcopy_blocktime(void);
Expand Down
2 changes: 2 additions & 0 deletions migration/postcopy-ram.c
Expand Up @@ -853,6 +853,7 @@ static void *postcopy_ram_fault_thread(void *opaque)
RAMBlock *rb = NULL;

trace_postcopy_ram_fault_thread_entry();
rcu_register_thread();
mis->last_rb = NULL; /* last RAMBlock we sent part of */
qemu_sem_post(&mis->fault_thread_sem);

Expand Down Expand Up @@ -1059,6 +1060,7 @@ static void *postcopy_ram_fault_thread(void *opaque)
}
}
}
rcu_unregister_thread();
trace_postcopy_ram_fault_thread_exit();
g_free(pfd);
return NULL;
Expand Down
12 changes: 10 additions & 2 deletions migration/qemu-file-channel.c
Expand Up @@ -49,7 +49,11 @@ static ssize_t channel_writev_buffer(void *opaque,
ssize_t len;
len = qio_channel_writev(ioc, local_iov, nlocal_iov, NULL);
if (len == QIO_CHANNEL_ERR_BLOCK) {
qio_channel_wait(ioc, G_IO_OUT);
if (qemu_in_coroutine()) {
qio_channel_yield(ioc, G_IO_OUT);
} else {
qio_channel_wait(ioc, G_IO_OUT);
}
continue;
}
if (len < 0) {
Expand Down Expand Up @@ -80,7 +84,11 @@ static ssize_t channel_get_buffer(void *opaque,
ret = qio_channel_read(ioc, (char *)buf, size, NULL);
if (ret < 0) {
if (ret == QIO_CHANNEL_ERR_BLOCK) {
qio_channel_yield(ioc, G_IO_IN);
if (qemu_in_coroutine()) {
qio_channel_yield(ioc, G_IO_IN);
} else {
qio_channel_wait(ioc, G_IO_IN);
}
} else {
/* XXX handle Error * object */
return -EIO;
Expand Down
8 changes: 6 additions & 2 deletions migration/qemu-file.c
Expand Up @@ -253,8 +253,12 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
if (f->hooks && f->hooks->save_page) {
int ret = f->hooks->save_page(f, f->opaque, block_offset,
offset, size, bytes_sent);
f->bytes_xfer += size;
if (ret != RAM_SAVE_CONTROL_DELAYED) {
if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
f->bytes_xfer += size;
}

if (ret != RAM_SAVE_CONTROL_DELAYED &&
ret != RAM_SAVE_CONTROL_NOT_SUPP) {
if (bytes_sent && *bytes_sent > 0) {
qemu_update_position(f, *bytes_sent);
} else if (ret < 0) {
Expand Down

0 comments on commit 54906fe

Please sign in to comment.