Skip to content

Commit

Permalink
feat(socket): Add support send and recv in a socket concurrently
Browse files Browse the repository at this point in the history
Signed-off-by: Jianhui Zhao <zhaojh329@gmail.com>
  • Loading branch information
zhaojh329 committed Apr 11, 2024
1 parent a575d8e commit d69cf08
Showing 1 changed file with 52 additions and 33 deletions.
85 changes: 52 additions & 33 deletions socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@

struct eco_socket {
struct eco_context *eco;
lua_State *L;
struct ev_timer tmr;
struct ev_io io;
struct {
uint8_t overtime:1;
uint8_t established:1;
uint8_t connecting:1;
} flag;
int domain;
int fd;
struct {
struct ev_io io;
lua_State *co;
size_t len;
size_t sent;
const void *data;
Expand All @@ -49,6 +50,8 @@ struct eco_socket {
};
} snd;
struct {
struct ev_io io;
lua_State *co;
double timeout;
bool from;
size_t len;
Expand All @@ -68,20 +71,36 @@ static void ev_timer_cb(struct ev_loop *loop, ev_timer *w, int revents)
{
struct eco_socket *sock = container_of(w, struct eco_socket, tmr);

ev_io_stop(loop, &sock->io);

sock->flag.overtime = 1;

eco_resume(sock->eco->L, sock->L, 0);
if (sock->flag.connecting) {
ev_io_stop(loop, &sock->snd.io);
eco_resume(sock->eco->L, sock->snd.co, 0);
} else {
ev_io_stop(loop, &sock->rcv.io);
eco_resume(sock->eco->L, sock->rcv.co, 0);
}
}

static void ev_io_cb(struct ev_loop *loop, ev_io *w, int revents)
static void ev_io_read_cb(struct ev_loop *loop, ev_io *w, int revents)
{
struct eco_socket *sock = container_of(w, struct eco_socket, io);
struct eco_socket *sock = container_of(w, struct eco_socket, rcv.io);

ev_io_stop(loop, w);
ev_timer_stop(loop, &sock->tmr);
eco_resume(sock->eco->L, sock->L, 0);
eco_resume(sock->eco->L, sock->rcv.co, 0);
}

static void ev_io_write_cb(struct ev_loop *loop, ev_io *w, int revents)
{
struct eco_socket *sock = container_of(w, struct eco_socket, snd.io);

ev_io_stop(loop, w);

if (sock->flag.connecting)
ev_timer_stop(loop, &sock->tmr);

eco_resume(sock->eco->L, sock->snd.co, 0);
}

static int lua_push_sockaddr(lua_State *L, struct sockaddr *addr, socklen_t len)
Expand Down Expand Up @@ -199,7 +218,9 @@ static int eco_socket_init(lua_State *L, int fd, int domain, bool established)
sock->fd = fd;

ev_timer_init(&sock->tmr, ev_timer_cb, 0.0, 0);
ev_io_init(&sock->io, ev_io_cb, fd, 0);

ev_io_init(&sock->rcv.io, ev_io_read_cb, fd, EV_READ);
ev_io_init(&sock->snd.io, ev_io_write_cb, fd, EV_WRITE);

return 1;
}
Expand Down Expand Up @@ -246,7 +267,7 @@ static int lua_acceptk(lua_State *L, int status, lua_KContext ctx)
socklen_t addrlen = sizeof(addr);
int fd;

sock->L = NULL;
sock->rcv.co = NULL;

again:
fd = accept4(sock->fd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
Expand All @@ -255,9 +276,8 @@ static int lua_acceptk(lua_State *L, int status, lua_KContext ctx)
goto again;

if (errno == EAGAIN) {
sock->L = L;
ev_io_modify(&sock->io, EV_READ);
ev_io_start(sock->eco->loop, &sock->io);
sock->rcv.co = L;
ev_io_start(sock->eco->loop, &sock->rcv.io);
return lua_yieldk(L, 0, ctx, lua_acceptk);
}

Expand Down Expand Up @@ -286,7 +306,8 @@ static int lua_connectk(lua_State *L, int status, lua_KContext ctx)
int narg = 1;
int err = 0;

sock->L = NULL;
sock->flag.connecting = false;
sock->snd.co = NULL;

if (sock->flag.overtime) {
sock->flag.overtime = 0;
Expand Down Expand Up @@ -321,10 +342,10 @@ static int lua_connect(lua_State *L)
ev_timer_set(&sock->tmr, 5.0, 0);
ev_timer_start(sock->eco->loop, &sock->tmr);

ev_io_modify(&sock->io, EV_WRITE);
ev_io_start(sock->eco->loop, &sock->io);
ev_io_start(sock->eco->loop, &sock->snd.io);

sock->L = L;
sock->flag.connecting = true;
sock->snd.co = L;

return lua_yieldk(L, 0, (lua_KContext)sock, lua_connectk);
}
Expand Down Expand Up @@ -357,7 +378,7 @@ static int lua_recvk(lua_State *L, int status, lua_KContext ctx)
int fd = sock->fd;
ssize_t ret;

sock->L = NULL;
sock->rcv.co = NULL;

if (sock->flag.overtime) {
sock->flag.overtime = 0;
Expand All @@ -377,15 +398,14 @@ static int lua_recvk(lua_State *L, int status, lua_KContext ctx)
goto again;

if (errno == EAGAIN) {
sock->L = L;
sock->rcv.co = L;

if (sock->rcv.timeout > 0) {
ev_timer_set(&sock->tmr, sock->rcv.timeout, 0);
ev_timer_start(sock->eco->loop, &sock->tmr);
}

ev_io_modify(&sock->io, EV_READ);
ev_io_start(sock->eco->loop, &sock->io);
ev_io_start(sock->eco->loop, &sock->rcv.io);
return lua_yieldk(L, 0, ctx, lua_recvk);
}

Expand All @@ -410,7 +430,7 @@ static int __lua_recv(lua_State *L, bool from)
{
struct eco_socket *sock = luaL_checkudata(L, 1, ECO_SOCKET_MT);

if (sock->L) {
if (sock->rcv.co) {
lua_pushnil(L);
lua_pushliteral(L, "busy reading");
return 2;
Expand Down Expand Up @@ -442,7 +462,7 @@ static int lua_recvfrom(lua_State *L)

static inline int lua_init_snd(struct eco_socket *sock, lua_State *L)
{
if (sock->L) {
if (sock->snd.co) {
lua_pushnil(L);
lua_pushliteral(L, "busy");
return -1;
Expand All @@ -464,7 +484,7 @@ static int lua_sendk(lua_State *L, int status, lua_KContext ctx)
size_t len = sock->snd.len;
int ret;

sock->L = NULL;
sock->snd.co = NULL;

again:
if (addrlen)
Expand All @@ -476,9 +496,8 @@ static int lua_sendk(lua_State *L, int status, lua_KContext ctx)
goto again;

if (errno == EAGAIN) {
sock->L = L;
ev_io_modify(&sock->io, EV_WRITE);
ev_io_start(sock->eco->loop, &sock->io);
sock->snd.co = L;
ev_io_start(sock->eco->loop, &sock->snd.io);
return lua_yieldk(L, 0, ctx, lua_sendk);
}

Expand Down Expand Up @@ -532,7 +551,7 @@ static int lua_sendfilek(lua_State *L, int status, lua_KContext ctx)
off_t offset = sock->snd.offset;
int ret;

sock->L = NULL;
sock->snd.co = NULL;

again:
if (offset < 0)
Expand All @@ -544,9 +563,8 @@ static int lua_sendfilek(lua_State *L, int status, lua_KContext ctx)
goto again;

if (errno == EAGAIN) {
sock->L = L;
ev_io_modify(&sock->io, EV_WRITE);
ev_io_start(sock->eco->loop, &sock->io);
sock->snd.co = L;
ev_io_start(sock->eco->loop, &sock->snd.io);
return lua_yieldk(L, 0, ctx, lua_sendfilek);
}

Expand Down Expand Up @@ -578,7 +596,7 @@ static int lua_sendfile(lua_State *L)
const char *path;
int fd;

if (sock->L) {
if (sock->snd.co) {
lua_pushnil(L);
lua_pushliteral(L, "busy");
return 2;
Expand Down Expand Up @@ -802,7 +820,8 @@ static int lua_sock_close(lua_State *L)
}

ev_timer_stop(loop, &sock->tmr);
ev_io_stop(loop, &sock->io);
ev_io_stop(loop, &sock->rcv.io);
ev_io_stop(loop, &sock->snd.io);

close(sock->fd);

Expand Down

0 comments on commit d69cf08

Please sign in to comment.