Skip to content

Commit

Permalink
colo-compare: Use IOThread to Check old packet regularly and Process …
Browse files Browse the repository at this point in the history
…pactkets of the primary

Remove the task which check old packet in the comparing thread,
then use IOthread context timer to handle it.

Process pactkets in the IOThread which arrived over the socket.
we use iothread_get_g_main_context to create a new g_main_loop in
the IOThread.then the packets from the primary and the secondary
are processed in the IOThread.

Finally remove the colo-compare thread using the IOThread instead.

Reviewed-by: Zhang Chen<zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Wang Yong <wang.yong155@zte.com.cn>
Signed-off-by: Wang Guang <wang.guang55@zte.com.cn>
Signed-off-by: Jason Wang <jasowang@redhat.com>
  • Loading branch information
Wang Yong authored and jasowang committed Sep 8, 2017
1 parent 329163c commit dd321ec
Showing 1 changed file with 45 additions and 38 deletions.
83 changes: 45 additions & 38 deletions net/colo-compare.c
Expand Up @@ -29,6 +29,7 @@
#include "qemu/sockets.h"
#include "qapi-visit.h"
#include "net/colo.h"
#include "sysemu/iothread.h"

#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
Expand Down Expand Up @@ -82,11 +83,10 @@ typedef struct CompareState {
GQueue conn_list;
/* Record the connection without repetition */
GHashTable *connection_track_table;
/* This thread just do packet compare job */
QemuThread thread;

IOThread *iothread;
GMainContext *worker_context;
GMainLoop *compare_loop;
QEMUTimer *packet_check_timer;
} CompareState;

typedef struct CompareClass {
Expand Down Expand Up @@ -615,22 +615,40 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
* Check old packet regularly so it can watch for any packets
* that the secondary hasn't produced equivalents of.
*/
static gboolean check_old_packet_regular(void *opaque)
static void check_old_packet_regular(void *opaque)
{
CompareState *s = opaque;

/* if have old packet we will notify checkpoint */
colo_old_packet_check(s);
timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
REGULAR_PACKET_CHECK_MS);
}

static void colo_compare_timer_init(CompareState *s)
{
AioContext *ctx = iothread_get_aio_context(s->iothread);

return TRUE;
s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
SCALE_MS, check_old_packet_regular,
s);
timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
REGULAR_PACKET_CHECK_MS);
}

static void *colo_compare_thread(void *opaque)
static void colo_compare_timer_del(CompareState *s)
{
CompareState *s = opaque;
GSource *timeout_source;
if (s->packet_check_timer) {
timer_del(s->packet_check_timer);
timer_free(s->packet_check_timer);
s->packet_check_timer = NULL;
}
}

s->worker_context = g_main_context_new();
static void colo_compare_iothread(CompareState *s)
{
object_ref(OBJECT(s->iothread));
s->worker_context = iothread_get_g_main_context(s->iothread);

qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
compare_pri_chr_in, NULL, NULL,
Expand All @@ -639,20 +657,7 @@ static void *colo_compare_thread(void *opaque)
compare_sec_chr_in, NULL, NULL,
s, s->worker_context, true);

s->compare_loop = g_main_loop_new(s->worker_context, FALSE);

/* To kick any packets that the secondary doesn't match */
timeout_source = g_timeout_source_new(REGULAR_PACKET_CHECK_MS);
g_source_set_callback(timeout_source,
(GSourceFunc)check_old_packet_regular, s, NULL);
g_source_attach(timeout_source, s->worker_context);

g_main_loop_run(s->compare_loop);

g_source_unref(timeout_source);
g_main_loop_unref(s->compare_loop);
g_main_context_unref(s->worker_context);
return NULL;
colo_compare_timer_init(s);
}

static char *compare_get_pri_indev(Object *obj, Error **errp)
Expand Down Expand Up @@ -777,12 +782,10 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
{
CompareState *s = COLO_COMPARE(uc);
Chardev *chr;
char thread_name[64];
static int compare_id;

if (!s->pri_indev || !s->sec_indev || !s->outdev) {
if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
error_setg(errp, "colo compare needs 'primary_in' ,"
"'secondary_in','outdev' property set");
"'secondary_in','outdev','iothread' property set");
return;
} else if (!strcmp(s->pri_indev, s->outdev) ||
!strcmp(s->sec_indev, s->outdev) ||
Expand Down Expand Up @@ -817,12 +820,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
g_free,
connection_destroy);

sprintf(thread_name, "colo-compare %d", compare_id);
qemu_thread_create(&s->thread, thread_name,
colo_compare_thread, s,
QEMU_THREAD_JOINABLE);
compare_id++;

colo_compare_iothread(s);
return;
}

Expand Down Expand Up @@ -866,6 +864,10 @@ static void colo_compare_init(Object *obj)
object_property_add_str(obj, "outdev",
compare_get_outdev, compare_set_outdev,
NULL);
object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
(Object **)&s->iothread,
object_property_allow_set_link,
OBJ_PROP_LINK_UNREF_ON_RELEASE, NULL);

s->vnet_hdr = false;
object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
Expand All @@ -879,16 +881,21 @@ static void colo_compare_finalize(Object *obj)
qemu_chr_fe_deinit(&s->chr_pri_in, false);
qemu_chr_fe_deinit(&s->chr_sec_in, false);
qemu_chr_fe_deinit(&s->chr_out, false);

g_main_loop_quit(s->compare_loop);
qemu_thread_join(&s->thread);

if (s->iothread) {
colo_compare_timer_del(s);
}
/* Release all unhandled packets after compare thead exited */
g_queue_foreach(&s->conn_list, colo_flush_packets, s);

g_queue_clear(&s->conn_list);

g_hash_table_destroy(s->connection_track_table);
if (s->connection_track_table) {
g_hash_table_destroy(s->connection_track_table);
}

if (s->iothread) {
object_unref(OBJECT(s->iothread));
}
g_free(s->pri_indev);
g_free(s->sec_indev);
g_free(s->outdev);
Expand Down

0 comments on commit dd321ec

Please sign in to comment.