Skip to content

Commit

Permalink
Merge branch 'iocp-fixes'
Browse files Browse the repository at this point in the history
* iocp-fixes:
  regress: test for HTTP/HTTPS with IOCP enabled
  bev_async: trigger/run only deferred callbacks
  bev_async: do not initialize timeouts multiple times
  bev_async: set "ok" on setfd if fd>=0 (like we do during creation)
  bev_async: ignore ERROR_INVALID_PARAMETER on .setfd for iocp

Closes: #709
Refs: nmathewson/Libevent#160
  • Loading branch information
azat committed Nov 13, 2018
2 parents 5dc88b3 + 811c63f commit 3d815cf
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
55 changes: 37 additions & 18 deletions bufferevent_async.c
Expand Up @@ -100,6 +100,27 @@ const struct bufferevent_ops bufferevent_ops_async = {
be_async_ctrl,
};

static inline void
be_async_run_eventcb(struct bufferevent *bev, short what, int options)
{ bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }

static inline void
be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
{ bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }

static inline int
fatal_error(int err)
{
switch (err) {
/* We may have already associated this fd with a port.
* Let's hope it's this port, and that the error code
* for doing this neer changes. */
case ERROR_INVALID_PARAMETER:
return 0;
}
return 1;
}

static inline struct bufferevent_async *
upcast(struct bufferevent *bev)
{
Expand Down Expand Up @@ -217,7 +238,7 @@ bev_async_consider_writing(struct bufferevent_async *beva)
&beva->write_overlapped)) {
bufferevent_decref_(bev);
beva->ok = 0;
bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
} else {
beva->write_in_progress = at_most;
bufferevent_decrement_write_buckets_(&beva->bev, at_most);
Expand Down Expand Up @@ -270,7 +291,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
bufferevent_incref_(bev);
if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0;
bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
bufferevent_decref_(bev);
} else {
beva->read_in_progress = at_most;
Expand Down Expand Up @@ -428,8 +449,7 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
else
bev_async_set_wsa_error(bev, eo);

bufferevent_run_eventcb_(bev,
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);

event_base_del_virtual_(bev->ev_base);

Expand Down Expand Up @@ -459,16 +479,16 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
bufferevent_trigger_nolock_(bev, EV_READ, 0);
be_async_trigger_nolock(bev, EV_READ, 0);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
bufferevent_run_eventcb_(bev, what, 0);
be_async_run_eventcb(bev, what, 0);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
bufferevent_run_eventcb_(bev, what, 0);
be_async_run_eventcb(bev, what, 0);
}
}

Expand Down Expand Up @@ -502,16 +522,16 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
be_async_trigger_nolock(bev, EV_WRITE, 0);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
bufferevent_run_eventcb_(bev, what, 0);
be_async_run_eventcb(bev, what, 0);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
bufferevent_run_eventcb_(bev, what, 0);
be_async_run_eventcb(bev, what, 0);
}
}

Expand All @@ -532,11 +552,7 @@ bufferevent_async_new_(struct event_base *base,
return NULL;

if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
int err = GetLastError();
/* We may have alrady associated this fd with a port.
* Let's hope it's this port, and that the error code
* for doing this neer changes. */
if (err != ERROR_INVALID_PARAMETER)
if (fatal_error(GetLastError()))
return NULL;
}

Expand Down Expand Up @@ -580,7 +596,6 @@ bufferevent_async_set_connected_(struct bufferevent *bev)
{
struct bufferevent_async *bev_async = upcast(bev);
bev_async->ok = 1;
bufferevent_init_generic_timeout_cbs_(bev);
/* Now's a good time to consider reading/writing */
be_async_enable(bev, bev->enabled);
}
Expand Down Expand Up @@ -654,16 +669,20 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
data->fd = evbuffer_overlapped_get_fd_(bev->input);
return 0;
case BEV_CTRL_SET_FD: {
struct bufferevent_async *bev_a = upcast(bev);
struct event_iocp_port *iocp;

if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
return 0;
if (!(iocp = event_base_get_iocp_(bev->ev_base)))
return -1;
if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
return -1;
if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
if (fatal_error(GetLastError()))
return -1;
}
evbuffer_overlapped_set_fd_(bev->input, data->fd);
evbuffer_overlapped_set_fd_(bev->output, data->fd);
bev_a->ok = data->fd >= 0;
return 0;
}
case BEV_CTRL_CANCEL_ALL: {
Expand Down
1 change: 1 addition & 0 deletions test/regress.h
Expand Up @@ -43,6 +43,7 @@ extern struct testcase_t bufferevent_iocp_testcases[];
extern struct testcase_t util_testcases[];
extern struct testcase_t signal_testcases[];
extern struct testcase_t http_testcases[];
extern struct testcase_t http_iocp_testcases[];
extern struct testcase_t dns_testcases[];
extern struct testcase_t rpc_testcases[];
extern struct testcase_t edgetriggered_testcases[];
Expand Down
5 changes: 5 additions & 0 deletions test/regress_http.c
Expand Up @@ -4968,3 +4968,8 @@ struct testcase_t http_testcases[] = {
END_OF_TESTCASES
};

struct testcase_t http_iocp_testcases[] = {
{ "simple", http_simple_test, TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
{ "https_simple", https_simple_test, TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
END_OF_TESTCASES
};
1 change: 1 addition & 0 deletions test/regress_main.c
Expand Up @@ -384,6 +384,7 @@ struct testgroup_t testgroups[] = {
{ "iocp/", iocp_testcases },
{ "iocp/bufferevent/", bufferevent_iocp_testcases },
{ "iocp/listener/", listener_iocp_testcases },
{ "iocp/http/", http_iocp_testcases },
#endif
#ifdef EVENT__HAVE_OPENSSL
{ "ssl/", ssl_testcases },
Expand Down

0 comments on commit 3d815cf

Please sign in to comment.