Skip to content

Commit

Permalink
opt intrfd
Browse files Browse the repository at this point in the history
  • Loading branch information
bingbing committed Feb 22, 2022
1 parent 355155f commit 5db9e3a
Showing 1 changed file with 62 additions and 54 deletions.
116 changes: 62 additions & 54 deletions include/netp/poller_interruptable_by_fd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ namespace netp {
SOCKET fdw;
io_ctx* ctx;
std::atomic<bool> is_sigset;
fdinterrupt_monitor(SOCKET fdr_, SOCKET fdw_):
fdr(fdr_),
fdw(fdw_),
fdinterrupt_monitor():
fdr(NETP_INVALID_SOCKET),
fdw(NETP_INVALID_SOCKET),
ctx(0),
is_sigset(false)
{}
Expand All @@ -31,41 +31,40 @@ namespace netp {
if (status == netp::OK) {
#ifdef _NETP_DEBUG_INTERRUPT_
NETP_ASSERT(is_sigset.load(std::memory_order_acquire));
size_t nbytes = 0;
u32_t nbytes = 0;
#endif
byte_t tmp[4] = { 0 };
byte_t tmp[4];
int ec = netp::OK;

do {
#ifdef NETP_HAS_PIPE
//NOTE: error 88 if we do read|write on a pipe fd
__label_read:
ssize_t c = ::read(fdr, tmp, 4);
if (c == ssize_t(-1)) {
ec = netp_socket_get_last_errno();
_NETP_REFIX_EWOULDBLOCK(ec);
} else if (c == 0) {
//Is the following case possible ?
//c ==0 && ec == E_EINTR
if ( NETP_UNLIKELY(c <= 0) ) { //0,-1
ec = netp_socket_get_last_errno();
if (ec == 0) {
if (ec == netp::E_EINTR) {
goto __label_read;
} else if (ec == 0) {
ec = netp::E_PIPE_CLOSED;
}
_NETP_REFIX_EWOULDBLOCK(ec);
}
#ifdef _NETP_DEBUG_INTERRUPT_
else { nbytes+=c; }
#endif

#else
u32_t c = netp::recv(fdr, tmp, 4, ec, 0);
(void)c;
#endif

#ifdef _NETP_DEBUG_INTERRUPT_
#ifdef _NETP_DEBUG_INTERRUPT_
nbytes += c;
} while (ec == netp::OK);
#else
} while (ec == netp::E_EINTR);
#endif
#endif
} while (false/*as we have at most 1 bytes in buffer, we can use false to save one syscall*/ && (ec == netp::OK) );

is_sigset.store(false, std::memory_order_release);
#ifdef _NETP_DEBUG_INTERRUPT_
NETP_ASSERT(nbytes <= 1, "nbytes: %u", nbytes );
NETP_ASSERT(nbytes <= 1, "nbytes: %u", nbytes);
#endif
is_sigset.store(false, std::memory_order_release);
}
}

Expand All @@ -75,10 +74,9 @@ namespace netp {
//save one write
return;
}

int ec;
const char interrutp_i = 'i';
#ifdef NETP_HAS_PIPE
#ifdef NETP_HAS_PIPE
do {
int c = ::write(fdw, (const void*)&interrutp_i, 1);
if (c == 1) {
Expand All @@ -92,10 +90,43 @@ namespace netp {
u32_t c = netp::send(fdw, (byte_t const* const)&interrutp_i, 1, ec, 0);
(void)c;
if (NETP_UNLIKELY(ec != netp::OK)) {
NETP_WARN("[fdinterrupt_monitor][##u]interrupt send failed: %d", fdw, ec);
NETP_WARN("[fdinterrupt_monitor][##%u]interrupt send failed: %d", fdw, ec);
}
#endif
}

void init() {
int rt;
SOCKET fds[2] = { NETP_INVALID_SOCKET, NETP_INVALID_SOCKET };
#ifdef NETP_HAS_PIPE
while (pipe(fds) == -1) {
netp::this_thread::yield();
}
#else
rt = netp::socketpair(int(NETP_AF_INET), int(NETP_SOCK_STREAM), int(NETP_PROTOCOL_TCP), fds);
NETP_ASSERT(rt == netp::OK, "rt: %d", rt);
rt = netp::set_nodelay(fds[1], true);
NETP_ASSERT(rt == netp::OK, "rt: %d", rt);
#endif
NETP_VERBOSE("[poller_interruptable_by_fd]init pipe done, fds[r]: %u, fds[w]: %u", fds[0], fds[1]);

rt = netp::set_nonblocking(fds[0], true);
NETP_ASSERT(rt == netp::OK, "rt: %d", rt);

rt = netp::set_nonblocking(fds[1], true);
NETP_ASSERT(rt == netp::OK, "rt: %d", rt);

fdr = fds[0];
fdw = fds[1];
}

void close() {
netp::close(fdr);
fdr = NETP_INVALID_SOCKET;

netp::close(fdw);
fdw = NETP_INVALID_SOCKET;
}
};

class poller_interruptable_by_fd :
Expand All @@ -122,45 +153,22 @@ namespace netp {
~poller_interruptable_by_fd() {}

void __init_interrupt_fd() {
int rt;
SOCKET fds[2] = {NETP_INVALID_SOCKET, NETP_INVALID_SOCKET};
#ifdef NETP_HAS_PIPE
while (pipe(fds) == -1) {
netp::this_thread::yield();
}
#else
rt = netp::socketpair(int(NETP_AF_INET), int(NETP_SOCK_STREAM), int(NETP_PROTOCOL_TCP), fds);
NETP_ASSERT(rt == netp::OK, "rt: %d", rt);
rt = netp::set_nodelay(fds[1], true);
NETP_ASSERT(rt == netp::OK, "rt: %d", rt);
#endif
NETP_VERBOSE("[poller_interruptable_by_fd]init pipe done, fds[r]: %u, fds[w]: %u", fds[0], fds[1]);

rt = netp::set_nonblocking(fds[0],true);
NETP_ASSERT(rt == netp::OK, "rt: %d", rt);

rt = netp::set_nonblocking(fds[1],true);
NETP_ASSERT(rt == netp::OK, "rt: %d", rt);

m_fdintr = netp::make_ref<fdinterrupt_monitor>(fds[0],fds[1]);
io_ctx* ctx = io_begin(fds[0], m_fdintr);
m_fdintr = netp::make_ref<fdinterrupt_monitor>();
m_fdintr->init();
io_ctx* ctx = io_begin(m_fdintr->fdr, m_fdintr);
NETP_ASSERT(ctx != 0);
rt = io_do(io_action::READ, ctx);
int rt = io_do(io_action::READ, ctx);
NETP_ASSERT(rt == netp::OK, "fd: %d, rt: %d, errno: %d", ctx->fd, rt, netp_socket_get_last_errno() );
m_fdintr->ctx = ctx;
NETP_VERBOSE("[poller_interruptable_by_fd]__init_interrupt_fd done, fd_r: %u, m_fd_w: %u", fds[0], fds[1]);
NETP_VERBOSE("[poller_interruptable_by_fd]__init_interrupt_fd done, fd_r: %u, m_fd_w: %u", m_fdintr->fdr, m_fdintr->fdw);
}

void __deinit_interrupt_fd() {
NETP_VERBOSE("[poller_interruptable_by_fd]__deinit_interrupt_fd begin, fd_r: %u, m_fd_w: %u", m_fdintr->fdr, m_fdintr->fdw);
io_do(io_action::END_READ, m_fdintr->ctx);
netp::close(m_fdintr->fdr);
netp::close(m_fdintr->fdw);
m_fdintr->fdr = NETP_INVALID_SOCKET;
m_fdintr->fdw = NETP_INVALID_SOCKET;
io_end(m_fdintr->ctx);
m_fdintr->ctx = nullptr;

m_fdintr->close();
NETP_VERBOSE("[poller_interruptable_by_fd]__deinit_interrupt_fd done");
NETP_ASSERT(NETP_LIST_IS_EMPTY(&m_io_ctx_list), "m_io_ctx_list not empty");
}
Expand Down

0 comments on commit 5db9e3a

Please sign in to comment.