Skip to content

Commit

Permalink
Merge tag 'migration-20231018-pull-request' of https://gitlab.com/jua…
Browse files Browse the repository at this point in the history
…n.quintela/qemu into staging

Migration Pull request (20231018)

In this pull request:
- RDMA cleanups
- compression cleanups

CI: https://gitlab.com/juan.quintela/qemu/-/pipelines/1040780020

Please apply.

PD.  I tried to get the deprecated bits integrated, but I broke
     qemu-iotests duer to blk warning.  Will resend it.

# -----BEGIN PGP SIGNATURE-----
#
# iQIzBAABCAAdFiEEGJn/jt6/WMzuA0uC9IfvGFhy1yMFAmUvrjQACgkQ9IfvGFhy
# 1yODnQ/+OKaOQMAEtJsJ1B67394VRjpGDd0K47U3uewJJ26XRMUy4uw0zeGYdiBc
# VFjrX1NJu4jRZBOdhRzZQLZU9wDEGY/8zIL/sJB55X/gv1EysDB3IrNCWosNL8SS
# weeYu9qkxsB5aJfM4Lp6XnPIplb7PIMSqX380sUGcK7uVLo3x3H8PgFxQszG5ZMs
# 8OqhOdxZ8jPc7gUOxPcA0n/L6pJcfnuK1/8Vlf5wbkdD+lyVCs0QDTSgX8AnS5hd
# DniV2nMFkVvNkOhDG3X8qr8FyjyQ9eyJxxw/2Nt+0201UIiCirz3U2T6EMejCYOv
# LtIaaNaNHmEw5OdfSBhEjXOy7gHavcn+/LlUexYJQSiB/CXrdgh3jpSFmlAzcAY2
# Si514BRty6WX43f+698PSAKF4XaRnPGtvVCv7ubrFb1qVrg8DTEnYXNO+LadqSBS
# bu1TpRK1iVgKnApQN1SQr26MRAzU+U+yqz/MB9QzIGwonM2TEWCF6c5Sqq6/RK8S
# IIDu4s/NTx0wVWrR2rAZv335ANGa7oa1z2LykxcXmBJozqWAOgo6wzZJJ3klcpjZ
# Li39m2jzx36oSKqudYANxj2Ds0MvXStnd2ZX9mQiEB++S2SY2Z8he57HESQwNzf0
# 0Z61eqdHr7th4zfLz/akiiZnRs66A+6/LFRFpoKedSAABE/sKY8=
# =gNbf
# -----END PGP SIGNATURE-----
# gpg: Signature made Wed 18 Oct 2023 03:06:44 PDT
# gpg:                using RSA key 1899FF8EDEBF58CCEE034B82F487EF185872D723
# gpg: Good signature from "Juan Quintela <quintela@redhat.com>" [full]
# gpg:                 aka "Juan Quintela <quintela@trasno.org>" [full]
# Primary key fingerprint: 1899 FF8E DEBF 58CC EE03  4B82 F487 EF18 5872 D723

* tag 'migration-20231018-pull-request' of https://gitlab.com/juan.quintela/qemu:
  migration: save_zero_page() can take block through pss
  migration: control_save_page() can take block through pss
  migration: save_compress_page() can take block through pss
  migration: Print block status when needed
  migration: Use "i" as an for index in ram-compress.c
  migration: Simplify decompress_data_with_multi_threads()
  migration: Move update_compress_threads_counts() to ram-compress.c
  migration: Create ram_compressed_pages()
  migration: Create populate_compress()
  migration: Move compression_counters cleanup ram-compress.c
  migration: RDMA is not compatible with anything else

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
  • Loading branch information
stefanhaRH committed Oct 19, 2023
2 parents deaca3f + e8e4e7a commit 57e32f0
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 83 deletions.
5 changes: 2 additions & 3 deletions migration/migration-hmp-cmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "sysemu/runstate.h"
#include "ui/qemu-spice.h"
#include "sysemu/sysemu.h"
#include "options.h"
#include "migration.h"

static void migration_global_dump(Monitor *mon)
Expand Down Expand Up @@ -696,7 +697,6 @@ void hmp_x_colo_lost_heartbeat(Monitor *mon, const QDict *qdict)
typedef struct HMPMigrationStatus {
QEMUTimer *timer;
Monitor *mon;
bool is_block_migration;
} HMPMigrationStatus;

static void hmp_migrate_status_cb(void *opaque)
Expand All @@ -722,7 +722,7 @@ static void hmp_migrate_status_cb(void *opaque)

timer_mod(status->timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + 1000);
} else {
if (status->is_block_migration) {
if (migrate_block()) {
monitor_printf(status->mon, "\n");
}
if (info->error_desc) {
Expand Down Expand Up @@ -762,7 +762,6 @@ void hmp_migrate(Monitor *mon, const QDict *qdict)

status = g_malloc0(sizeof(*status));
status->mon = mon;
status->is_block_migration = blk || inc;
status->timer = timer_new_ms(QEMU_CLOCK_REALTIME, hmp_migrate_status_cb,
status);
timer_mod(status->timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME));
Expand Down
27 changes: 14 additions & 13 deletions migration/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,18 @@ static void qemu_start_incoming_migration(const char *uri, Error **errp)
socket_start_incoming_migration(p ? p : uri, errp);
#ifdef CONFIG_RDMA
} else if (strstart(uri, "rdma:", &p)) {
if (migrate_compress()) {
error_setg(errp, "RDMA and compression can't be used together");
return;
}
if (migrate_xbzrle()) {
error_setg(errp, "RDMA and XBZRLE can't be used together");
return;
}
if (migrate_multifd()) {
error_setg(errp, "RDMA and multifd can't be used together");
return;
}
rdma_start_incoming_migration(p, errp);
#endif
} else if (strstart(uri, "exec:", &p)) {
Expand Down Expand Up @@ -962,16 +974,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
info->xbzrle_cache->overflow = xbzrle_counters.overflow;
}

if (migrate_compress()) {
info->compression = g_malloc0(sizeof(*info->compression));
info->compression->pages = compression_counters.pages;
info->compression->busy = compression_counters.busy;
info->compression->busy_rate = compression_counters.busy_rate;
info->compression->compressed_size =
compression_counters.compressed_size;
info->compression->compression_rate =
compression_counters.compression_rate;
}
populate_compress(info);

if (cpu_throttle_active()) {
info->has_cpu_throttle_percentage = true;
Expand Down Expand Up @@ -1454,11 +1457,9 @@ int migrate_init(MigrationState *s, Error **errp)
s->switchover_acked = false;
s->rdma_migration = false;
/*
* set mig_stats compression_counters memory to zero for a
* new migration
* set mig_stats memory to zero for a new migration
*/
memset(&mig_stats, 0, sizeof(mig_stats));
memset(&compression_counters, 0, sizeof(compression_counters));
migration_reset_vfio_bytes_transferred();

return 0;
Expand Down
105 changes: 68 additions & 37 deletions migration/ram-compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
#include "ram-compress.h"

#include "qemu/error-report.h"
#include "qemu/stats64.h"
#include "migration.h"
#include "options.h"
#include "io/channel-null.h"
#include "exec/target_page.h"
#include "exec/ramblock.h"
#include "ram.h"
#include "migration-stats.h"

CompressionStats compression_counters;

Expand Down Expand Up @@ -227,27 +230,25 @@ static inline void compress_reset_result(CompressParam *param)

void flush_compressed_data(int (send_queued_data(CompressParam *)))
{
int idx, thread_count;

thread_count = migrate_compress_threads();
int thread_count = migrate_compress_threads();

qemu_mutex_lock(&comp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
while (!comp_param[idx].done) {
for (int i = 0; i < thread_count; i++) {
while (!comp_param[i].done) {
qemu_cond_wait(&comp_done_cond, &comp_done_lock);
}
}
qemu_mutex_unlock(&comp_done_lock);

for (idx = 0; idx < thread_count; idx++) {
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) {
CompressParam *param = &comp_param[idx];
for (int i = 0; i < thread_count; i++) {
qemu_mutex_lock(&comp_param[i].mutex);
if (!comp_param[i].quit) {
CompressParam *param = &comp_param[i];
send_queued_data(param);
assert(qemu_file_buffer_empty(param->file));
compress_reset_result(param);
}
qemu_mutex_unlock(&comp_param[idx].mutex);
qemu_mutex_unlock(&comp_param[i].mutex);
}
}

Expand All @@ -262,15 +263,15 @@ static inline void set_compress_params(CompressParam *param, RAMBlock *block,
int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
int (send_queued_data(CompressParam *)))
{
int idx, thread_count, pages = -1;
int thread_count, pages = -1;
bool wait = migrate_compress_wait_thread();

thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
retry:
for (idx = 0; idx < thread_count; idx++) {
if (comp_param[idx].done) {
CompressParam *param = &comp_param[idx];
for (int i = 0; i < thread_count; i++) {
if (comp_param[i].done) {
CompressParam *param = &comp_param[i];
qemu_mutex_lock(&param->mutex);
param->done = false;
send_queued_data(param);
Expand Down Expand Up @@ -364,16 +365,14 @@ static void *do_data_decompress(void *opaque)

int wait_for_decompress_done(void)
{
int idx, thread_count;

if (!migrate_compress()) {
return 0;
}

thread_count = migrate_decompress_threads();
int thread_count = migrate_decompress_threads();
qemu_mutex_lock(&decomp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
while (!decomp_param[idx].done) {
for (int i = 0; i < thread_count; i++) {
while (!decomp_param[i].done) {
qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
}
Expand Down Expand Up @@ -430,6 +429,11 @@ int compress_threads_load_setup(QEMUFile *f)
return 0;
}

/*
* set compression_counters memory to zero for a new migration
*/
memset(&compression_counters, 0, sizeof(compression_counters));

thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
Expand Down Expand Up @@ -459,27 +463,54 @@ int compress_threads_load_setup(QEMUFile *f)

void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
{
int idx, thread_count;

thread_count = migrate_decompress_threads();
int thread_count = migrate_decompress_threads();
QEMU_LOCK_GUARD(&decomp_done_lock);
while (true) {
for (idx = 0; idx < thread_count; idx++) {
if (decomp_param[idx].done) {
decomp_param[idx].done = false;
qemu_mutex_lock(&decomp_param[idx].mutex);
qemu_get_buffer(f, decomp_param[idx].compbuf, len);
decomp_param[idx].des = host;
decomp_param[idx].len = len;
qemu_cond_signal(&decomp_param[idx].cond);
qemu_mutex_unlock(&decomp_param[idx].mutex);
break;
for (int i = 0; i < thread_count; i++) {
if (decomp_param[i].done) {
decomp_param[i].done = false;
qemu_mutex_lock(&decomp_param[i].mutex);
qemu_get_buffer(f, decomp_param[i].compbuf, len);
decomp_param[i].des = host;
decomp_param[i].len = len;
qemu_cond_signal(&decomp_param[i].cond);
qemu_mutex_unlock(&decomp_param[i].mutex);
return;
}
}
if (idx < thread_count) {
break;
} else {
qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
}

void populate_compress(MigrationInfo *info)
{
if (!migrate_compress()) {
return;
}
info->compression = g_malloc0(sizeof(*info->compression));
info->compression->pages = compression_counters.pages;
info->compression->busy = compression_counters.busy;
info->compression->busy_rate = compression_counters.busy_rate;
info->compression->compressed_size = compression_counters.compressed_size;
info->compression->compression_rate = compression_counters.compression_rate;
}

uint64_t ram_compressed_pages(void)
{
return compression_counters.pages;
}

void update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
{
ram_transferred_add(bytes_xmit);

if (param->result == RES_ZEROPAGE) {
stat64_add(&mig_stats.zero_pages, 1);
return;
}

/* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
compression_counters.compressed_size += bytes_xmit - 8;
compression_counters.pages++;
}

5 changes: 5 additions & 0 deletions migration/ram-compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#define QEMU_MIGRATION_COMPRESS_H

#include "qemu-file.h"
#include "qapi/qapi-types-migration.h"

enum CompressResult {
RES_NONE = 0,
Expand Down Expand Up @@ -67,4 +68,8 @@ void compress_threads_load_cleanup(void);
int compress_threads_load_setup(QEMUFile *f);
void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len);

void populate_compress(MigrationInfo *info);
uint64_t ram_compressed_pages(void);
void update_compress_thread_counts(const CompressParam *param, int bytes_xmit);

#endif

0 comments on commit 57e32f0

Please sign in to comment.