Skip to content

Commit

Permalink
aio: more callback work
Browse files Browse the repository at this point in the history
  • Loading branch information
Reini Urban committed Jun 13, 2013
1 parent 5af47ed commit 72b203e
Showing 1 changed file with 98 additions and 63 deletions.
161 changes: 98 additions & 63 deletions lib/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ PN aio_vt, aio_tcp_vt, aio_udp_vt, aio_loop_vt, aio_tty_vt, aio_pipe_vt, aio_pol
int r; \
uv_##T##_t *handle = (uv_##T##_t*)PN_DATA(self); \
if (!loop) loop = (PN)uv_default_loop(); \
else loop = PN_DATA(loop); \
else loop = (PN)PN_DATA(loop); \
r = uv_##T##_init((uv_loop_t*)loop, handle); \
if (!r) return potion_io_error(P, _XSTR(t) "_init"); \
return self; \
Expand Down Expand Up @@ -64,16 +64,16 @@ static PN aio_tty_init(Potion *P, PN cl, PN self, PN loop, PN file, PN readable)
int r;
uv_tty_t *handle = (uv_tty_t*)PN_DATA(self);
if (!loop) loop = (PN)uv_default_loop();
else loop = PN_DATA(loop);
r = uv_tty_init((uv_loop_t*)loop, handle, PN_DATA(file), PN_NUM(readable));
else loop = (PN)PN_DATA(loop);
r = uv_tty_init((uv_loop_t*)loop, handle, PN_NUM(PN_DATA(file)), PN_NUM(readable));
if (!r) return potion_io_error(P, "tty_init");
return self;
}
static PN aio_pipe_init(Potion *P, PN cl, PN self, PN loop, PN ipc) {
int r;
uv_pipe_t *handle = (uv_pipe_t*)PN_DATA(self);
if (!loop) loop = (PN)uv_default_loop();
else loop = PN_DATA(loop);
else loop = (PN)PN_DATA(loop);
r = uv_pipe_init((uv_loop_t*)loop, handle, PN_NUM(ipc));
if (!r) return potion_io_error(P, "pipe_init");
return self;
Expand All @@ -82,7 +82,7 @@ static PN aio_poll_init(Potion *P, PN cl, PN self, PN loop, PN fd) {
int r;
uv_poll_t *handle = (uv_poll_t*)PN_DATA(self);
if (!loop) loop = (PN)uv_default_loop();
else loop = PN_DATA(loop);
else loop = (PN)PN_DATA(loop);
r = uv_poll_init((uv_loop_t*)loop, handle, PN_NUM(fd));
if (!r) return potion_io_error(P, "poll_init");
return self;
Expand All @@ -105,7 +105,7 @@ static PN aio_fs_event_init(Potion *P, PN cl, PN self, PN loop, PN filename, PN
int r;
uv_fs_event_t *handle = (uv_fs_event_t*)PN_DATA(self);
if (!loop) loop = (PN)uv_default_loop();
else loop = PN_DATA(loop);
else loop = (PN)PN_DATA(loop);
r = uv_fs_event_init((uv_loop_t*)loop, handle, PN_STR_PTR(filename), (uv_fs_event_cb)PN_DATA(cb), PN_NUM(flags));
if (!r) return potion_io_error(P, "fs_event_init");
return self;
Expand All @@ -114,33 +114,108 @@ static PN aio_async_init(Potion *P, PN cl, PN self, PN loop, PN cb) {
int r;
uv_async_t *handle = (uv_async_t*)PN_DATA(self);
if (!loop) loop = (PN)uv_default_loop();
else loop = PN_DATA(loop);
else loop = (PN)PN_DATA(loop);
r = uv_async_init((uv_loop_t*)loop, handle, (uv_async_cb)PN_DATA(cb));
if (!r) return potion_io_error(P, "async_init");
return self;
}
#define DEF_AIO_REQ_WRAP(T,H) \
struct aio_##T##_s { \
struct uv_##H##_s req; \
Potion *P; \
PN cl; \
uv_##T##_cb cb; \
} aio_##T##_t
DEF_AIO_REQ_WRAP(write,write);
DEF_AIO_REQ_WRAP(connect,connect);
DEF_AIO_REQ_WRAP(shutdown,shutdown);
DEF_AIO_REQ_WRAP(connection,stream);
DEF_AIO_REQ_WRAP(close,handle);
DEF_AIO_REQ_WRAP(poll,poll);
DEF_AIO_REQ_WRAP(timer,timer);
DEF_AIO_REQ_WRAP(async,async);
DEF_AIO_REQ_WRAP(prepare,prepare);
DEF_AIO_REQ_WRAP(check,check);
DEF_AIO_REQ_WRAP(idle,idle);
DEF_AIO_REQ_WRAP(exit,process);
DEF_AIO_REQ_WRAP(walk,handle);
DEF_AIO_REQ_WRAP(fs,fs);
DEF_AIO_REQ_WRAP(work,work);
DEF_AIO_REQ_WRAP(after_work,work);
DEF_AIO_REQ_WRAP(getaddrinfo,getaddrinfo);

#define DEF_AIO_NEW(T) \
static PN aio_##T##_new(Potion *P, PN cl, PN self) { \
struct PNData *data = potion_data_alloc(P, sizeof(uv_##T##_t)+sizeof(void*)); \
data->vt = aio_##T##_vt; \
return (PN)data; \
}
DEF_AIO_NEW(loop)
DEF_AIO_NEW(async)
DEF_AIO_NEW(check)
DEF_AIO_NEW(fs_event)
DEF_AIO_NEW(fs_poll)
DEF_AIO_NEW(handle)
DEF_AIO_NEW(idle)
DEF_AIO_NEW(pipe)
DEF_AIO_NEW(poll)
DEF_AIO_NEW(prepare)
DEF_AIO_NEW(process)
DEF_AIO_NEW(stream)
DEF_AIO_NEW(tcp)
DEF_AIO_NEW(timer)
DEF_AIO_NEW(tty)
DEF_AIO_NEW(udp)
DEF_AIO_NEW(signal)
DEF_AIO_NEW(req)
DEF_AIO_NEW(connect)
DEF_AIO_NEW(write)
DEF_AIO_NEW(shutdown)
DEF_AIO_NEW(udp_send)
DEF_AIO_NEW(fs)
DEF_AIO_NEW(work)
DEF_AIO_NEW(getaddrinfo)
DEF_AIO_NEW(cpu_info)
DEF_AIO_NEW(interface_address)
DEF_AIO_NEW(mutex)
DEF_AIO_NEW(rwlock)
DEF_AIO_NEW(sem)
DEF_AIO_NEW(cond)
DEF_AIO_NEW(barrier)
#undef DEF_AIO_NEW

//cb wrappers
#define DEF_AIO_CB(N,T) \
static void \
aio_##N##_cb(uv_##T##_t* req, int status) { \
struct aio_##T##_s* wrap = (struct aio_##T##_s*)req; \
vPN(Closure) cb = PN_CLOSURE(wrap->cb); \
PN data = aio_##T##_new(wrap->P, wrap->cl, 0); \
memcpy(PN_DATA(data), req, sizeof(uv_##T##_t)); \
if (cb) cb->method(wrap->P, wrap->cl, data, PN_NUM(status)); \
}
//DEF_AIO_CB(write,write)
DEF_AIO_CB(connect,connect)
//DEF_AIO_CB(shutdown,shutdown)
//DEF_AIO_CB(connection,stream)
//DEF_AIO_CB(timer,timer)
//DEF_AIO_CB(async,async)

static PN
aio_tcp_connect(Potion *P, PN cl, PN self, PN req, PN addr, PN port, PN cb) {
uv_tcp_t *handle = (uv_tcp_t*)PN_DATA(self);
uv_connect_t *request = (uv_connect_t*)PN_DATA(req);
uv_tcp_t *handle = (uv_tcp_t*)PN_DATA(potion_fwd(self));
uv_connect_t *request = (uv_connect_t*)PN_DATA(potion_fwd(req));
uv_connect_cb connect_cb;
struct sockaddr_in ip4 = uv_ip4_addr(PN_STR_PTR(addr), PN_INT(port));
if (PN_IS_CLOSURE(cb)) connect_cb = (uv_connect_cb)PN_CLOSURE_F(cb);
else if (cb) connect_cb = (uv_connect_cb)PN_DATA(cb);
else connect_cb = 0;
if (PN_IS_CLOSURE(cb)) { //register user-callback. set cb ptr in req
request->cb = (uv_connect_cb)PN_CLOSURE(cb);
connect_cb = (uv_connect_cb)aio_connect_cb;
}
else if (cb) connect_cb = (uv_connect_cb)PN_DATA(cb); //c-level
else connect_cb = 0; //none
return PN_NUM(uv_tcp_connect(request, handle, ip4, connect_cb));
}

#if 0
static void
aio_shutdown_cb(Potion *P, PN cl, pn_aio self, int status) {
uv_stream_t* stream = ((uv_shutdown_t*)req)->handle;
potion_send(stream, shutdown_cb);
free(req);
}
#endif

/**
allocate a req or handle struct and return the object
*/
Expand Down Expand Up @@ -188,46 +263,6 @@ aio_new(Potion *P, PN cl, PN self, PN type) {
{}
#undef DEF_AIO_OBJ
}
#define DEF_AIO_NEW(T) \
static PN aio_##T##_new(Potion *P, PN cl, PN self) { \
struct PNData *data = potion_data_alloc(P, sizeof(uv_##T##_t)); \
data->vt = aio_##T##_vt; \
return (PN)data; \
}
DEF_AIO_NEW(loop)
DEF_AIO_NEW(async)
DEF_AIO_NEW(check)
DEF_AIO_NEW(fs_event)
DEF_AIO_NEW(fs_poll)
DEF_AIO_NEW(handle)
DEF_AIO_NEW(idle)
DEF_AIO_NEW(pipe)
DEF_AIO_NEW(poll)
DEF_AIO_NEW(prepare)
DEF_AIO_NEW(process)
DEF_AIO_NEW(stream)
DEF_AIO_NEW(tcp)
DEF_AIO_NEW(timer)
DEF_AIO_NEW(tty)
DEF_AIO_NEW(udp)
DEF_AIO_NEW(signal)
DEF_AIO_NEW(req)
DEF_AIO_NEW(connect)
DEF_AIO_NEW(write)
DEF_AIO_NEW(shutdown)
DEF_AIO_NEW(udp_send)
DEF_AIO_NEW(fs)
DEF_AIO_NEW(work)
DEF_AIO_NEW(getaddrinfo)
DEF_AIO_NEW(cpu_info)
DEF_AIO_NEW(interface_address)
DEF_AIO_NEW(mutex)
DEF_AIO_NEW(rwlock)
DEF_AIO_NEW(sem)
DEF_AIO_NEW(cond)
DEF_AIO_NEW(barrier)
#undef DEF_AIO_NEW

/**\memberof aio
initialize aio types
\param loop PNAioLoop to uv_loop_t*, defaults to uv_default_loop()
Expand All @@ -237,8 +272,8 @@ static PN
aio_init_loop(Potion *P, PN cl, PN self, PN loop) {
PN type = potion_send(PN_VTABLE(self), PN_name);
#define DEF_AIO_INIT(T) \
if (type == PN_STR("Aio_tcp")) \
return aio_tcp_init(P, cl, self, loop); \
if (type == PN_STR("Aio_"#T)) \
return aio_##T##_init(P, cl, self, loop); \
else
DEF_AIO_INIT(tcp)
DEF_AIO_INIT(udp)
Expand Down

0 comments on commit 72b203e

Please sign in to comment.