485 changes: 485 additions & 0 deletions migration/ram-compress.c

Large diffs are not rendered by default.

70 changes: 70 additions & 0 deletions migration/ram-compress.h
@@ -0,0 +1,70 @@
/*
* QEMU System Emulator
*
* Copyright (c) 2003-2008 Fabrice Bellard
* Copyright (c) 2011-2015 Red Hat Inc
*
* Authors:
* Juan Quintela <quintela@redhat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#ifndef QEMU_MIGRATION_COMPRESS_H
#define QEMU_MIGRATION_COMPRESS_H

#include "qemu-file.h"

enum CompressResult {
RES_NONE = 0,
RES_ZEROPAGE = 1,
RES_COMPRESS = 2
};
typedef enum CompressResult CompressResult;

struct CompressParam {
bool done;
bool quit;
bool trigger;
CompressResult result;
QEMUFile *file;
QemuMutex mutex;
QemuCond cond;
RAMBlock *block;
ram_addr_t offset;

/* internally used fields */
z_stream stream;
uint8_t *originbuf;
};
typedef struct CompressParam CompressParam;

void compress_threads_save_cleanup(void);
int compress_threads_save_setup(void);

void flush_compressed_data(int (send_queued_data(CompressParam *)));
int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
int (send_queued_data(CompressParam *)));

int wait_for_decompress_done(void);
void compress_threads_load_cleanup(void);
int compress_threads_load_setup(QEMUFile *f);
void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len);

#endif
550 changes: 60 additions & 490 deletions migration/ram.c

Large diffs are not rendered by default.

24 changes: 0 additions & 24 deletions migration/ram.h
Expand Up @@ -32,30 +32,7 @@
#include "qapi/qapi-types-migration.h"
#include "exec/cpu-common.h"
#include "io/channel.h"
#include "qemu/stats64.h"

/*
* These are the ram migration statistic counters. It is loosely
* based on MigrationStats. We change to Stat64 any counter that
* needs to be updated using atomic ops (can be accessed by more than
* one thread).
*/
typedef struct {
Stat64 dirty_bytes_last_sync;
Stat64 dirty_pages_rate;
Stat64 dirty_sync_count;
Stat64 dirty_sync_missed_zero_copy;
Stat64 downtime_bytes;
Stat64 zero_pages;
Stat64 multifd_bytes;
Stat64 normal_pages;
Stat64 postcopy_bytes;
Stat64 postcopy_requests;
Stat64 precopy_bytes;
Stat64 transferred;
} RAMStats;

extern RAMStats ram_counters;
extern XBZRLECacheStats xbzrle_counters;
extern CompressionStats compression_counters;

Expand All @@ -76,7 +53,6 @@ void mig_throttle_counter_reset(void);

uint64_t ram_pagesize_summary(void);
int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
void acct_update_position(QEMUFile *f, size_t size, bool zero);
void ram_postcopy_migrated_memory_release(MigrationState *ms);
/* For outgoing discard bitmap */
void ram_postcopy_send_discard_bitmap(MigrationState *ms);
Expand Down
9 changes: 7 additions & 2 deletions migration/rdma.c
Expand Up @@ -17,8 +17,10 @@
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "qemu/cutils.h"
#include "exec/target_page.h"
#include "rdma.h"
#include "migration.h"
#include "migration-stats.h"
#include "qemu-file.h"
#include "ram.h"
#include "qemu/error-report.h"
Expand Down Expand Up @@ -2120,7 +2122,8 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
return -EIO;
}

acct_update_position(f, sge.length, true);
stat64_add(&mig_stats.zero_pages,
sge.length / qemu_target_page_size());

return 1;
}
Expand Down Expand Up @@ -2228,7 +2231,9 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
}

set_bit(chunk, block->transit_bitmap);
acct_update_position(f, sge.length, false);
stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
ram_transferred_add(sge.length);
qemu_file_credit_transfer(f, sge.length);
rdma->total_writes++;

return 0;
Expand Down
3 changes: 2 additions & 1 deletion migration/savevm.c
Expand Up @@ -31,6 +31,7 @@
#include "net/net.h"
#include "migration.h"
#include "migration/snapshot.h"
#include "migration-stats.h"
#include "migration/vmstate.h"
#include "migration/misc.h"
#include "migration/register.h"
Expand Down Expand Up @@ -1621,7 +1622,7 @@ static int qemu_savevm_state(QEMUFile *f, Error **errp)
}

migrate_init(ms);
memset(&ram_counters, 0, sizeof(ram_counters));
memset(&mig_stats, 0, sizeof(mig_stats));
memset(&compression_counters, 0, sizeof(compression_counters));
ms->to_dst_file = f;

Expand Down
15 changes: 5 additions & 10 deletions migration/tls.c
Expand Up @@ -29,9 +29,7 @@
#include "trace.h"

static QCryptoTLSCreds *
migration_tls_get_creds(MigrationState *s,
QCryptoTLSCredsEndpoint endpoint,
Error **errp)
migration_tls_get_creds(QCryptoTLSCredsEndpoint endpoint, Error **errp)
{
Object *creds;
const char *tls_creds = migrate_tls_creds();
Expand Down Expand Up @@ -80,8 +78,7 @@ void migration_tls_channel_process_incoming(MigrationState *s,
QCryptoTLSCreds *creds;
QIOChannelTLS *tioc;

creds = migration_tls_get_creds(
s, QCRYPTO_TLS_CREDS_ENDPOINT_SERVER, errp);
creds = migration_tls_get_creds(QCRYPTO_TLS_CREDS_ENDPOINT_SERVER, errp);
if (!creds) {
return;
}
Expand Down Expand Up @@ -117,15 +114,13 @@ static void migration_tls_outgoing_handshake(QIOTask *task,
object_unref(OBJECT(ioc));
}

QIOChannelTLS *migration_tls_client_create(MigrationState *s,
QIOChannel *ioc,
QIOChannelTLS *migration_tls_client_create(QIOChannel *ioc,
const char *hostname,
Error **errp)
{
QCryptoTLSCreds *creds;

creds = migration_tls_get_creds(
s, QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT, errp);
creds = migration_tls_get_creds(QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT, errp);
if (!creds) {
return NULL;
}
Expand All @@ -145,7 +140,7 @@ void migration_tls_channel_connect(MigrationState *s,
{
QIOChannelTLS *tioc;

tioc = migration_tls_client_create(s, ioc, hostname, errp);
tioc = migration_tls_client_create(ioc, hostname, errp);
if (!tioc) {
return;
}
Expand Down
3 changes: 1 addition & 2 deletions migration/tls.h
Expand Up @@ -28,8 +28,7 @@ void migration_tls_channel_process_incoming(MigrationState *s,
QIOChannel *ioc,
Error **errp);

QIOChannelTLS *migration_tls_client_create(MigrationState *s,
QIOChannel *ioc,
QIOChannelTLS *migration_tls_client_create(QIOChannel *ioc,
const char *hostname,
Error **errp);

Expand Down
126 changes: 126 additions & 0 deletions tests/qtest/migration-test.c
Expand Up @@ -406,6 +406,41 @@ static void migrate_set_parameter_str(QTestState *who, const char *parameter,
migrate_check_parameter_str(who, parameter, value);
}

static long long migrate_get_parameter_bool(QTestState *who,
const char *parameter)
{
QDict *rsp;
int result;

rsp = wait_command(who, "{ 'execute': 'query-migrate-parameters' }");
result = qdict_get_bool(rsp, parameter);
qobject_unref(rsp);
return !!result;
}

static void migrate_check_parameter_bool(QTestState *who, const char *parameter,
int value)
{
int result;

result = migrate_get_parameter_bool(who, parameter);
g_assert_cmpint(result, ==, value);
}

static void migrate_set_parameter_bool(QTestState *who, const char *parameter,
int value)
{
QDict *rsp;

rsp = qtest_qmp(who,
"{ 'execute': 'migrate-set-parameters',"
"'arguments': { %s: %i } }",
parameter, value);
g_assert(qdict_haskey(rsp, "return"));
qobject_unref(rsp);
migrate_check_parameter_bool(who, parameter, value);
}

static void migrate_ensure_non_converge(QTestState *who)
{
/* Can't converge with 1ms downtime + 3 mbs bandwidth limit */
Expand Down Expand Up @@ -1092,6 +1127,36 @@ test_migrate_tls_x509_finish(QTestState *from,
#endif /* CONFIG_TASN1 */
#endif /* CONFIG_GNUTLS */

static void *
test_migrate_compress_start(QTestState *from,
QTestState *to)
{
migrate_set_parameter_int(from, "compress-level", 1);
migrate_set_parameter_int(from, "compress-threads", 4);
migrate_set_parameter_bool(from, "compress-wait-thread", true);
migrate_set_parameter_int(to, "decompress-threads", 4);

migrate_set_capability(from, "compress", true);
migrate_set_capability(to, "compress", true);

return NULL;
}

static void *
test_migrate_compress_nowait_start(QTestState *from,
QTestState *to)
{
migrate_set_parameter_int(from, "compress-level", 9);
migrate_set_parameter_int(from, "compress-threads", 1);
migrate_set_parameter_bool(from, "compress-wait-thread", false);
migrate_set_parameter_int(to, "decompress-threads", 1);

migrate_set_capability(from, "compress", true);
migrate_set_capability(to, "compress", true);

return NULL;
}

static int migrate_postcopy_prepare(QTestState **from_ptr,
QTestState **to_ptr,
MigrateCommon *args)
Expand Down Expand Up @@ -1169,6 +1234,15 @@ static void test_postcopy(void)
test_postcopy_common(&args);
}

static void test_postcopy_compress(void)
{
MigrateCommon args = {
.start_hook = test_migrate_compress_start
};

test_postcopy_common(&args);
}

static void test_postcopy_preempt(void)
{
MigrateCommon args = {
Expand Down Expand Up @@ -1270,6 +1344,15 @@ static void test_postcopy_recovery(void)
test_postcopy_recovery_common(&args);
}

static void test_postcopy_recovery_compress(void)
{
MigrateCommon args = {
.start_hook = test_migrate_compress_start
};

test_postcopy_recovery_common(&args);
}

#ifdef CONFIG_GNUTLS
static void test_postcopy_recovery_tls_psk(void)
{
Expand Down Expand Up @@ -1303,6 +1386,7 @@ static void test_postcopy_preempt_all(void)

test_postcopy_recovery_common(&args);
}

#endif

static void test_baddest(void)
Expand Down Expand Up @@ -1524,6 +1608,40 @@ static void test_precopy_unix_xbzrle(void)
test_precopy_common(&args);
}

static void test_precopy_unix_compress(void)
{
g_autofree char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
MigrateCommon args = {
.connect_uri = uri,
.listen_uri = uri,
.start_hook = test_migrate_compress_start,
/*
* Test that no invalid thread state is left over from
* the previous iteration.
*/
.iterations = 2,
};

test_precopy_common(&args);
}

static void test_precopy_unix_compress_nowait(void)
{
g_autofree char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
MigrateCommon args = {
.connect_uri = uri,
.listen_uri = uri,
.start_hook = test_migrate_compress_nowait_start,
/*
* Test that no invalid thread state is left over from
* the previous iteration.
*/
.iterations = 2,
};

test_precopy_common(&args);
}

static void test_precopy_tcp_plain(void)
{
MigrateCommon args = {
Expand Down Expand Up @@ -2520,8 +2638,12 @@ int main(int argc, char **argv)

if (has_uffd) {
qtest_add_func("/migration/postcopy/plain", test_postcopy);
qtest_add_func("/migration/postcopy/compress/plain",
test_postcopy_compress);
qtest_add_func("/migration/postcopy/recovery/plain",
test_postcopy_recovery);
qtest_add_func("/migration/postcopy/recovery/compress/plain",
test_postcopy_recovery_compress);
qtest_add_func("/migration/postcopy/preempt/plain", test_postcopy_preempt);
qtest_add_func("/migration/postcopy/preempt/recovery/plain",
test_postcopy_preempt_recovery);
Expand All @@ -2530,6 +2652,10 @@ int main(int argc, char **argv)
qtest_add_func("/migration/bad_dest", test_baddest);
qtest_add_func("/migration/precopy/unix/plain", test_precopy_unix_plain);
qtest_add_func("/migration/precopy/unix/xbzrle", test_precopy_unix_xbzrle);
qtest_add_func("/migration/precopy/unix/compress/wait",
test_precopy_unix_compress);
qtest_add_func("/migration/precopy/unix/compress/nowait",
test_precopy_unix_compress_nowait);
#ifdef CONFIG_GNUTLS
qtest_add_func("/migration/precopy/unix/tls/psk",
test_precopy_unix_tls_psk);
Expand Down