Skip to content

Commit

Permalink
iohandlers: port posix implementation to glib
Browse files Browse the repository at this point in the history
  • Loading branch information
mdroth committed Aug 14, 2013
1 parent 49c881d commit 9a749a2
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 85 deletions.
2 changes: 2 additions & 0 deletions include/qemu/main-loop.h
Expand Up @@ -171,6 +171,8 @@ void qemu_del_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque);
typedef void IOReadHandler(void *opaque, const uint8_t *buf, int size);
typedef int IOCanReadHandler(void *opaque);

void fd_source_attach(GMainContext *ctx);

/**
* qemu_set_fd_handler2: Register a file descriptor with the main loop
*
Expand Down
271 changes: 188 additions & 83 deletions iohandler.c
Expand Up @@ -32,60 +32,227 @@
#include <sys/wait.h>
#endif

#ifndef _WIN32

typedef struct IOHandlerRecord {
IOCanReadHandler *fd_read_poll;
IOHandler *fd_read;
IOHandler *fd_write;
void *opaque;
QLIST_ENTRY(IOHandlerRecord) next;
int fd;
int pollfds_idx;
GPollFD pfd;
bool deleted;
} IOHandlerRecord;

static QLIST_HEAD(, IOHandlerRecord) io_handlers =
QLIST_HEAD_INITIALIZER(io_handlers);
typedef struct FDSource {
GSource source;
QemuMutex mutex;
QLIST_HEAD(, IOHandlerRecord) io_handlers;
bool dispatching;
QemuCond dispatching_complete;
} FDSource;

static gboolean fd_source_prepare(GSource *source, gint *timeout)
{
FDSource *fdsrc = (FDSource *)source;
IOHandlerRecord *pioh, *ioh;

#ifndef _WIN32
/* XXX: fd_read_poll should be suppressed, but an API change is
necessary in the character devices to suppress fd_can_read(). */
int qemu_set_fd_handler2(int fd,
IOCanReadHandler *fd_read_poll,
IOHandler *fd_read,
IOHandler *fd_write,
void *opaque)
qemu_mutex_lock(&fdsrc->mutex);

QLIST_FOREACH_SAFE(ioh, &fdsrc->io_handlers, next, pioh) {
int events = 0;

if (ioh->deleted) {
g_source_remove_poll(source, &ioh->pfd);
QLIST_REMOVE(ioh, next);
g_free(ioh);
continue;
}
if (ioh->fd_read &&
(!ioh->fd_read_poll ||
ioh->fd_read_poll(ioh->opaque) != 0)) {
events |= G_IO_IN | G_IO_HUP | G_IO_ERR;
}
if (ioh->fd_write) {
events |= G_IO_OUT | G_IO_ERR;
}
if (events) {
ioh->pfd.events = events;
} else {
ioh->pfd.events = 0;
}
}

qemu_mutex_unlock(&fdsrc->mutex);

return false;
}

static gboolean fd_source_check(GSource *source)
{
FDSource *fdsrc = (FDSource *)source;
IOHandlerRecord *ioh;
gboolean dispatch_needed = false;

qemu_mutex_lock(&fdsrc->mutex);

QLIST_FOREACH(ioh, &fdsrc->io_handlers, next) {
if (ioh->pfd.revents) {
dispatch_needed = true;
}
}

qemu_mutex_unlock(&fdsrc->mutex);

return dispatch_needed;
}

static gboolean fd_source_dispatch(GSource *source, GSourceFunc cb,
gpointer user_data)
{
FDSource *fdsrc = (FDSource *)source;
IOHandlerRecord *pioh, *ioh;

qemu_mutex_lock(&fdsrc->mutex);
fdsrc->dispatching = true;
qemu_mutex_unlock(&fdsrc->mutex);

/* dispatch functions may modify io_handlers as we
* call them here, but we are guaranteed no other thread will
* access this list since, while we're dispatching, all functions
* that attempt to access FDSource members must wait on
* dispatching_complete condition unless g_main_context_is_owner()
* is true for the calling thread. as a result, we can walk the
* list here without holding the FSource mutex, since it's
* guaranteed these conditions will hold due to
* g_main_context_acquire() being required prior to calling
* g_main_context_dispatch()
*/
QLIST_FOREACH_SAFE(ioh, &fdsrc->io_handlers, next, pioh) {
int revents = 0;

if (!ioh->deleted) {
revents = ioh->pfd.revents;
ioh->pfd.revents = 0;
}

if (!ioh->deleted && ioh->fd_read &&
(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR))) {
ioh->fd_read(ioh->opaque);
}
if (!ioh->deleted && ioh->fd_write &&
(revents & (G_IO_OUT | G_IO_ERR))) {
ioh->fd_write(ioh->opaque);
}
}

qemu_mutex_lock(&fdsrc->mutex);
fdsrc->dispatching = false;
qemu_cond_broadcast(&fdsrc->dispatching_complete);
qemu_mutex_unlock(&fdsrc->mutex);

return true;
}

static void fd_source_finalize(GSource *source)
{
}

static GSourceFuncs fd_source_funcs = {
fd_source_prepare,
fd_source_check,
fd_source_dispatch,
fd_source_finalize
};

static gboolean fd_source_cb(gpointer user_data)
{
return true;
}

void fd_source_attach(GMainContext *ctx)
{
GSource *src = g_source_new(&fd_source_funcs, sizeof(FDSource));
FDSource *fdsrc = (FDSource *)src;

QLIST_INIT(&fdsrc->io_handlers);
qemu_mutex_init(&fdsrc->mutex);
g_source_set_callback(src, fd_source_cb, NULL, NULL);
g_source_attach(src, ctx);
}

static int fd_source_set_handler(GMainContext *ctx,
int fd,
IOCanReadHandler *fd_read_poll,
IOHandler *fd_read,
IOHandler *fd_write,
void *opaque)
{
GSource *src;
FDSource *fdsrc;
IOHandlerRecord *ioh;
bool in_dispatch;

assert(fd >= 0);

if (!ctx) {
ctx = g_main_context_default();
}
/* FIXME: we need a more reliable way to find our GSource */
src = g_main_context_find_source_by_funcs_user_data(
ctx, &fd_source_funcs, NULL);
assert(src);
fdsrc = (FDSource *)src;

qemu_mutex_lock(&fdsrc->mutex);
in_dispatch = fdsrc->dispatching && g_main_context_is_owner(ctx);

if (!in_dispatch) {
while (fdsrc->dispatching) {
qemu_cond_wait(&fdsrc->dispatching_complete, &fdsrc->mutex);
}
}

if (!fd_read && !fd_write) {
QLIST_FOREACH(ioh, &io_handlers, next) {
if (ioh->fd == fd) {
QLIST_FOREACH(ioh, &fdsrc->io_handlers, next) {
if (ioh->pfd.fd == fd) {
ioh->deleted = 1;
break;
}
}
} else {
QLIST_FOREACH(ioh, &io_handlers, next) {
if (ioh->fd == fd)
QLIST_FOREACH(ioh, &fdsrc->io_handlers, next) {
if (ioh->pfd.fd == fd)
goto found;
}
ioh = g_malloc0(sizeof(IOHandlerRecord));
QLIST_INSERT_HEAD(&io_handlers, ioh, next);
found:
ioh->fd = fd;
QLIST_INSERT_HEAD(&fdsrc->io_handlers, ioh, next);
ioh->pfd.fd = fd;
g_source_add_poll(src, &ioh->pfd);
found:
ioh->fd_read_poll = fd_read_poll;
ioh->fd_read = fd_read;
ioh->fd_write = fd_write;
ioh->opaque = opaque;
ioh->pollfds_idx = -1;
ioh->deleted = 0;
qemu_notify_event();
g_main_context_wakeup(ctx);
}

qemu_mutex_unlock(&fdsrc->mutex);

return 0;
}

int qemu_set_fd_handler2(int fd,
IOCanReadHandler *fd_read_poll,
IOHandler *fd_read,
IOHandler *fd_write,
void *opaque)
{
return fd_source_set_handler(NULL, fd, fd_read_poll, fd_read, fd_write,
opaque);
}

#else

typedef struct SocketHandler {
Expand Down Expand Up @@ -279,68 +446,6 @@ int qemu_set_fd_handler(int fd,
return qemu_set_fd_handler2(fd, NULL, fd_read, fd_write, opaque);
}

void qemu_iohandler_fill(GArray *pollfds)
{
IOHandlerRecord *ioh;

QLIST_FOREACH(ioh, &io_handlers, next) {
int events = 0;

if (ioh->deleted)
continue;
if (ioh->fd_read &&
(!ioh->fd_read_poll ||
ioh->fd_read_poll(ioh->opaque) != 0)) {
events |= G_IO_IN | G_IO_HUP | G_IO_ERR;
}
if (ioh->fd_write) {
events |= G_IO_OUT | G_IO_ERR;
}
if (events) {
GPollFD pfd = {
.fd = ioh->fd,
.events = events,
};
ioh->pollfds_idx = pollfds->len;
g_array_append_val(pollfds, pfd);
} else {
ioh->pollfds_idx = -1;
}
}
}

void qemu_iohandler_poll(GArray *pollfds, int ret)
{
if (ret > 0) {
IOHandlerRecord *pioh, *ioh;

QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
int revents = 0;

if (!ioh->deleted && ioh->pollfds_idx != -1) {
GPollFD *pfd = &g_array_index(pollfds, GPollFD,
ioh->pollfds_idx);
revents = pfd->revents;
}

if (!ioh->deleted && ioh->fd_read &&
(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR))) {
ioh->fd_read(ioh->opaque);
}
if (!ioh->deleted && ioh->fd_write &&
(revents & (G_IO_OUT | G_IO_ERR))) {
ioh->fd_write(ioh->opaque);
}

/* Do this last in case read/write handlers marked it for deletion */
if (ioh->deleted) {
QLIST_REMOVE(ioh, next);
g_free(ioh);
}
}
}
}

/* reaping of zombies. right now we're not passing the status to
anyone, but it would be possible to add a callback. */
#ifndef _WIN32
Expand Down
8 changes: 6 additions & 2 deletions main-loop.c
Expand Up @@ -136,6 +136,8 @@ int qemu_init_main_loop(void)
exit(1);
}

fd_source_attach(g_main_context_default());

ret = qemu_signal_init();
if (ret) {
return ret;
Expand All @@ -161,6 +163,7 @@ static void glib_pollfds_fill(uint32_t *cur_timeout)
int timeout = 0;
int n;

g_assert(g_main_context_acquire(context));
g_main_context_prepare(context, &max_priority);

glib_pollfds_idx = gpollfds->len;
Expand All @@ -177,16 +180,19 @@ static void glib_pollfds_fill(uint32_t *cur_timeout)
if (timeout >= 0 && timeout < *cur_timeout) {
*cur_timeout = timeout;
}
g_main_context_release(context);
}

static void glib_pollfds_poll(void)
{
GMainContext *context = g_main_context_default();
GPollFD *pfds = &g_array_index(gpollfds, GPollFD, glib_pollfds_idx);

g_assert(g_main_context_acquire(context));
if (g_main_context_check(context, max_priority, pfds, glib_n_poll_fds)) {
g_main_context_dispatch(context);
}
g_main_context_release(context);
}

#define MAX_MAIN_LOOP_SPIN (1000)
Expand Down Expand Up @@ -397,9 +403,7 @@ int main_loop_wait(int nonblocking)
slirp_update_timeout(&timeout);
slirp_pollfds_fill(gpollfds);
#endif
qemu_iohandler_fill(gpollfds);
ret = os_host_main_loop_wait(timeout);
qemu_iohandler_poll(gpollfds, ret);
#ifdef CONFIG_SLIRP
slirp_pollfds_poll(gpollfds, (ret < 0));
#endif
Expand Down

0 comments on commit 9a749a2

Please sign in to comment.