Skip to content

Commit

Permalink
Revert "qmp: isolate responses into io thread"
Browse files Browse the repository at this point in the history
This reverts commit abe3cd0.

There is no need to add an additional queue to send the reply to the
IOThread, because QMP response is thread safe, and chardev write path
is thread safe. It will schedule the watcher in the associated
IOThread.

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
Reviewed-by: Markus Armbruster <armbru@redhat.com>
Message-Id: <20180829134043.31706-4-marcandre.lureau@redhat.com>
Signed-off-by: Markus Armbruster <armbru@redhat.com>
  • Loading branch information
elmarco authored and Markus Armbruster committed Aug 30, 2018
1 parent 2aa788f commit 2765601
Showing 1 changed file with 3 additions and 117 deletions.
120 changes: 3 additions & 117 deletions monitor.c
Expand Up @@ -182,8 +182,6 @@ typedef struct {
QemuMutex qmp_queue_lock;
/* Input queue that holds all the parsed QMP requests */
GQueue *qmp_requests;
/* Output queue contains all the QMP responses in order */
GQueue *qmp_responses;
} MonitorQMP;

/*
Expand Down Expand Up @@ -247,9 +245,6 @@ IOThread *mon_iothread;
/* Bottom half to dispatch the requests received from I/O thread */
QEMUBH *qmp_dispatcher_bh;

/* Bottom half to deliver the responses back to clients */
QEMUBH *qmp_respond_bh;

struct QMPRequest {
/* Owner of the request */
Monitor *mon;
Expand Down Expand Up @@ -375,19 +370,10 @@ static void monitor_qmp_cleanup_req_queue_locked(Monitor *mon)
}
}

/* Caller must hold the mon->qmp.qmp_queue_lock */
static void monitor_qmp_cleanup_resp_queue_locked(Monitor *mon)
{
while (!g_queue_is_empty(mon->qmp.qmp_responses)) {
qobject_unref((QDict *)g_queue_pop_head(mon->qmp.qmp_responses));
}
}

static void monitor_qmp_cleanup_queues(Monitor *mon)
{
qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
monitor_qmp_cleanup_req_queue_locked(mon);
monitor_qmp_cleanup_resp_queue_locked(mon);
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
}

Expand Down Expand Up @@ -518,85 +504,6 @@ static void qmp_send_response(Monitor *mon, const QDict *rsp)
qobject_unref(json);
}

static void qmp_queue_response(Monitor *mon, QDict *rsp)
{
if (mon->use_io_thread) {
/*
* Push a reference to the response queue. The I/O thread
* drains that queue and emits.
*/
qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
g_queue_push_tail(mon->qmp.qmp_responses, qobject_ref(rsp));
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
qemu_bh_schedule(qmp_respond_bh);
} else {
/*
* Not using monitor I/O thread, i.e. we are in the main thread.
* Emit right away.
*/
qmp_send_response(mon, rsp);
}
}

struct QMPResponse {
Monitor *mon;
QDict *data;
};
typedef struct QMPResponse QMPResponse;

static QDict *monitor_qmp_response_pop_one(Monitor *mon)
{
QDict *data;

qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
data = g_queue_pop_head(mon->qmp.qmp_responses);
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);

return data;
}

static void monitor_qmp_response_flush(Monitor *mon)
{
QDict *data;

while ((data = monitor_qmp_response_pop_one(mon))) {
qmp_send_response(mon, data);
qobject_unref(data);
}
}

/*
* Pop a QMPResponse from any monitor's response queue into @response.
* Return false if all the queues are empty; else true.
*/
static bool monitor_qmp_response_pop_any(QMPResponse *response)
{
Monitor *mon;
QDict *data = NULL;

qemu_mutex_lock(&monitor_lock);
QTAILQ_FOREACH(mon, &mon_list, entry) {
data = monitor_qmp_response_pop_one(mon);
if (data) {
response->mon = mon;
response->data = data;
break;
}
}
qemu_mutex_unlock(&monitor_lock);
return data != NULL;
}

static void monitor_qmp_bh_responder(void *opaque)
{
QMPResponse response;

while (monitor_qmp_response_pop_any(&response)) {
qmp_send_response(response.mon, response.data);
qobject_unref(response.data);
}
}

static MonitorQAPIEventConf monitor_qapi_event_conf[QAPI_EVENT__MAX] = {
/* Limit guest-triggerable events to 1 per second */
[QAPI_EVENT_RTC_CHANGE] = { 1000 * SCALE_MS },
Expand All @@ -620,7 +527,7 @@ static void monitor_qapi_event_emit(QAPIEvent event, QDict *qdict)
QTAILQ_FOREACH(mon, &mon_list, entry) {
if (monitor_is_qmp(mon)
&& mon->qmp.commands != &qmp_cap_negotiation_commands) {
qmp_queue_response(mon, qdict);
qmp_send_response(mon, qdict);
}
}
}
Expand Down Expand Up @@ -818,7 +725,6 @@ static void monitor_data_init(Monitor *mon, bool skip_flush,
mon->skip_flush = skip_flush;
mon->use_io_thread = use_io_thread;
mon->qmp.qmp_requests = g_queue_new();
mon->qmp.qmp_responses = g_queue_new();
}

static void monitor_data_destroy(Monitor *mon)
Expand All @@ -833,9 +739,7 @@ static void monitor_data_destroy(Monitor *mon)
qemu_mutex_destroy(&mon->mon_lock);
qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
monitor_qmp_cleanup_req_queue_locked(mon);
monitor_qmp_cleanup_resp_queue_locked(mon);
g_queue_free(mon->qmp.qmp_requests);
g_queue_free(mon->qmp.qmp_responses);
}

char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
Expand Down Expand Up @@ -4152,7 +4056,7 @@ static void monitor_qmp_respond(Monitor *mon, QDict *rsp, QObject *id)
qdict_put_obj(rsp, "id", qobject_ref(id));
}

qmp_queue_response(mon, rsp);
qmp_send_response(mon, rsp);
}
}

Expand Down Expand Up @@ -4445,7 +4349,7 @@ static void monitor_qmp_event(void *opaque, int event)
mon->qmp.commands = &qmp_cap_negotiation_commands;
monitor_qmp_caps_reset(mon);
data = qmp_greeting(mon);
qmp_queue_response(mon, data);
qmp_send_response(mon, data);
qobject_unref(data);
mon_refcount++;
break;
Expand All @@ -4456,7 +4360,6 @@ static void monitor_qmp_event(void *opaque, int event)
* stdio, it's possible that stdout is still open when stdin
* is closed.
*/
monitor_qmp_response_flush(mon);
monitor_qmp_cleanup_queues(mon);
json_message_parser_destroy(&mon->qmp.parser);
json_message_parser_init(&mon->qmp.parser, handle_qmp_command,
Expand Down Expand Up @@ -4559,15 +4462,6 @@ static void monitor_iothread_init(void)
qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
monitor_qmp_bh_dispatcher,
NULL);

/*
* The responder BH must be run in the monitor I/O thread, so that
* monitors that are using the I/O thread have their output
* written by the I/O thread.
*/
qmp_respond_bh = aio_bh_new(monitor_get_aio_context(),
monitor_qmp_bh_responder,
NULL);
}

void monitor_init_globals(void)
Expand Down Expand Up @@ -4714,12 +4608,6 @@ void monitor_cleanup(void)
*/
iothread_stop(mon_iothread);

/*
* Flush all response queues. Note that even after this flush,
* data may remain in output buffers.
*/
monitor_qmp_bh_responder(NULL);

/* Flush output buffers and destroy monitors */
qemu_mutex_lock(&monitor_lock);
QTAILQ_FOREACH_SAFE(mon, &mon_list, entry, next) {
Expand All @@ -4733,8 +4621,6 @@ void monitor_cleanup(void)
/* QEMUBHs needs to be deleted before destroying the I/O thread */
qemu_bh_delete(qmp_dispatcher_bh);
qmp_dispatcher_bh = NULL;
qemu_bh_delete(qmp_respond_bh);
qmp_respond_bh = NULL;

iothread_destroy(mon_iothread);
mon_iothread = NULL;
Expand Down

0 comments on commit 2765601

Please sign in to comment.