Skip to content

Commit

Permalink
Merge pull request #1247 from Barenboim/master
Browse files Browse the repository at this point in the history
Add PD_OP_RECV_MESSAGE to support UDP server.
  • Loading branch information
Barenboim committed Apr 13, 2023
2 parents ff0abcf + 8d958c8 commit 61dca8e
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 27 deletions.
106 changes: 80 additions & 26 deletions src/kernel/poller.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,14 +573,15 @@ static void __poller_handle_listen(struct __poller_node *node,
{
struct __poller_node *res = node->res;
struct sockaddr_storage ss;
socklen_t len;
struct sockaddr *addr = (struct sockaddr *)&ss;
socklen_t addrlen;
void *result;
int sockfd;
void *p;

while (1)
{
len = sizeof (struct sockaddr_storage);
sockfd = accept(node->data.fd, (struct sockaddr *)&ss, &len);
addrlen = sizeof (struct sockaddr_storage);
sockfd = accept(node->data.fd, addr, &addrlen);
if (sockfd < 0)
{
if (errno == EAGAIN || errno == EMFILE || errno == ENFILE)
Expand All @@ -591,13 +592,12 @@ static void __poller_handle_listen(struct __poller_node *node,
break;
}

p = node->data.accept((const struct sockaddr *)&ss, len,
sockfd, node->data.context);
if (!p)
result = node->data.accept(addr, addrlen, sockfd, node->data.context);
if (!result)
break;

res->data = node->data;
res->data.result = p;
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
Expand Down Expand Up @@ -733,17 +733,17 @@ static void __poller_handle_event(struct __poller_node *node,
struct __poller_node *res = node->res;
unsigned long long cnt = 0;
unsigned long long value;
ssize_t ret;
void *p;
void *result;
ssize_t n;

while (1)
{
ret = read(node->data.fd, &value, sizeof (unsigned long long));
if (ret == sizeof (unsigned long long))
n = read(node->data.fd, &value, sizeof (unsigned long long));
if (n == sizeof (unsigned long long))
cnt += value;
else
{
if (ret >= 0)
if (n >= 0)
errno = EINVAL;
break;
}
Expand All @@ -757,12 +757,12 @@ static void __poller_handle_event(struct __poller_node *node,
return;

cnt--;
p = node->data.event(node->data.context);
if (!p)
result = node->data.event(node->data.context);
if (!result)
break;

res->data = node->data;
res->data.result = p;
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
Expand Down Expand Up @@ -790,20 +790,20 @@ static void __poller_handle_notify(struct __poller_node *node,
poller_t *poller)
{
struct __poller_node *res = node->res;
ssize_t ret;
void *p;
void *result;
ssize_t n;

while (1)
{
ret = read(node->data.fd, &p, sizeof (void *));
if (ret == sizeof (void *))
n = read(node->data.fd, &result, sizeof (void *));
if (n == sizeof (void *))
{
p = node->data.notify(p, node->data.context);
if (!p)
result = node->data.notify(result, node->data.context);
if (!result)
break;

res->data = node->data;
res->data.result = p;
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
Expand All @@ -813,11 +813,11 @@ static void __poller_handle_notify(struct __poller_node *node,
if (!res)
break;
}
else if (ret < 0 && errno == EAGAIN)
else if (n < 0 && errno == EAGAIN)
return;
else
{
if (ret > 0)
if (n > 0)
errno = EINVAL;
break;
}
Expand All @@ -826,7 +826,7 @@ static void __poller_handle_notify(struct __poller_node *node,
if (__poller_remove_node(node, poller))
return;

if (ret == 0)
if (n == 0)
{
node->error = 0;
node->state = PR_ST_FINISHED;
Expand All @@ -841,6 +841,54 @@ static void __poller_handle_notify(struct __poller_node *node,
poller->cb((struct poller_result *)node, poller->ctx);
}

static void __poller_handle_recv_message(struct __poller_node *node,
poller_t *poller)
{
struct __poller_node *res = node->res;
struct sockaddr_storage ss;
struct sockaddr *addr = (struct sockaddr *)&ss;
socklen_t addrlen;
poller_message_t *msg;
void *p = poller->buf;
ssize_t n;

while (1)
{
addrlen = sizeof (struct sockaddr_storage);
n = recvfrom(node->data.fd, p, POLLER_BUFSIZE, 0, addr, &addrlen);
if (n < 0)
{
if (errno == EAGAIN)
return;
else
break;
}

msg = node->data.recv_message(addr, addrlen, p, n, node->data.context);
if (!msg)
break;

res->data = node->data;
res->data.message = msg;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);

res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
if (!res)
break;
}

if (__poller_remove_node(node, poller))
return;

node->error = errno;
node->state = PR_ST_ERROR;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
}

static int __poller_handle_pipe(poller_t *poller)
{
struct __poller_node **node = (struct __poller_node **)poller->buf;
Expand Down Expand Up @@ -1006,6 +1054,9 @@ static void *__poller_thread_routine(void *arg)
case PD_OP_NOTIFY:
__poller_handle_notify(node, poller);
break;
case PD_OP_RECV_MESSAGE:
__poller_handle_recv_message(node, poller);
break;
}
}
else if (node == (struct __poller_node *)1)
Expand Down Expand Up @@ -1240,6 +1291,9 @@ static int __poller_data_get_event(int *event, const struct poller_data *data)
case PD_OP_NOTIFY:
*event = EPOLLIN | EPOLLET;
return 1;
case PD_OP_RECV_MESSAGE:
*event = EPOLLIN | EPOLLET;
return 1;
default:
errno = EINVAL;
return -1;
Expand Down
5 changes: 4 additions & 1 deletion src/kernel/poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct __poller_message

struct poller_data
{
#define PD_OP_TIMER 0
#define PD_OP_READ 1
#define PD_OP_WRITE 2
#define PD_OP_LISTEN 3
Expand All @@ -46,7 +47,7 @@ struct poller_data
#define PD_OP_SSL_SHUTDOWN 7
#define PD_OP_EVENT 8
#define PD_OP_NOTIFY 9
#define PD_OP_TIMER 10
#define PD_OP_RECV_MESSAGE 10
short operation;
unsigned short iovcnt;
int fd;
Expand All @@ -58,6 +59,8 @@ struct poller_data
void *(*accept)(const struct sockaddr *, socklen_t, int, void *);
void *(*event)(void *);
void *(*notify)(void *, void *);
poller_message_t *(*recv_message)(const struct sockaddr *, socklen_t,
const void *, size_t, void *);
};
void *context;
union
Expand Down

0 comments on commit 61dca8e

Please sign in to comment.