Skip to content

Commit

Permalink
Improve socket option support
Browse files Browse the repository at this point in the history
Add new `on_sock_init` lifecycle callback, so that applications are
given an opportunity to set socket options such as `SO_RCVBUF` and
`SO_SNDBUF`.

This change also includes refactoring to make callback naming
conventions clearer and more consistent.

Closes #35
  • Loading branch information
markaylett committed Aug 30, 2019
1 parent efe3fcd commit abbe574
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 64 deletions.
5 changes: 3 additions & 2 deletions example/EchoClnt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ class EchoClnt : public StreamConnector<EchoClnt> {
}

private:
void do_connect(CyclTime now, IoSock&& sock, const Endpoint& ep)
void on_sock_init(CyclTime now, IoSock& sock) {}
void on_sock_connect(CyclTime now, IoSock&& sock, const Endpoint& ep)
{
TOOLBOX_INFO << "connection opened: " << ep;
inprogress_ = false;
Expand All @@ -132,7 +133,7 @@ class EchoClnt : public StreamConnector<EchoClnt> {
auto* const conn = new EchoConn{now, reactor_, move(sock), ep};
conn_list_.push_back(*conn);
}
void do_connect_error(CyclTime now, const std::exception& e)
void on_sock_connect_error(CyclTime now, const std::exception& e)
{
TOOLBOX_ERROR << "failed to connect: " << e.what();
aifuture_ = resolver_.resolve(uri_, SOCK_STREAM);
Expand Down
7 changes: 2 additions & 5 deletions example/EchoServ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,11 @@ class EchoServ : public StreamAcceptor<EchoServ> {
}

private:
void do_accept(CyclTime now, IoSock&& sock, const Endpoint& ep)
void on_sock_init(CyclTime now, IoSock& sock) {}
void on_sock_accept(CyclTime now, IoSock&& sock, const Endpoint& ep)
{
TOOLBOX_INFO << "connection opened: " << ep;

sock.set_non_block();
if (sock.is_ip_family()) {
set_tcp_no_delay(sock.get(), true);
}
// High performance TCP servers could use a custom allocator.
auto* const conn = new EchoConn{now, reactor_, move(sock), ep};
conn_list_.push_back(*conn);
Expand Down
18 changes: 9 additions & 9 deletions example/HttpServ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ class HttpApp : public HttpAppBase {
void bind(const std::string& path, Slot slot) { slot_map_[path] = slot; }

protected:
void do_on_connect(CyclTime now, const Endpoint& ep) noexcept override
void do_on_http_connect(CyclTime now, const Endpoint& ep) noexcept override
{
TOOLBOX_INFO << "http session connected: " << ep;
}
void do_on_disconnect(CyclTime now, const Endpoint& ep) noexcept override
void do_on_http_disconnect(CyclTime now, const Endpoint& ep) noexcept override
{
TOOLBOX_INFO << "http session disconnected: " << ep;
}
void do_on_error(CyclTime now, const Endpoint& ep, const std::exception& e,
HttpStream& os) noexcept override
void do_on_http_error(CyclTime now, const Endpoint& ep, const std::exception& e,
HttpStream& os) noexcept override
{
TOOLBOX_ERROR << "session error: " << ep << ": " << e.what();
TOOLBOX_ERROR << "http session error: " << ep << ": " << e.what();
}
void do_on_message(CyclTime now, const Endpoint& ep, const HttpRequest& req,
HttpStream& os) override
void do_on_http_message(CyclTime now, const Endpoint& ep, const HttpRequest& req,
HttpStream& os) override
{
const auto it = slot_map_.find(string{req.path()});
if (it != slot_map_.end()) {
Expand All @@ -68,9 +68,9 @@ class HttpApp : public HttpAppBase {
}
os.commit();
}
void do_on_timeout(CyclTime now, const Endpoint& ep) noexcept override
void do_on_http_timeout(CyclTime now, const Endpoint& ep) noexcept override
{
TOOLBOX_WARNING << "session timeout: " << ep;
TOOLBOX_WARNING << "http session timeout: " << ep;
}

private:
Expand Down
33 changes: 18 additions & 15 deletions toolbox/http/App.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,32 @@ class TOOLBOX_API HttpAppBase {
constexpr HttpAppBase(HttpAppBase&&) noexcept = default;
HttpAppBase& operator=(HttpAppBase&&) noexcept = default;

void on_connect(CyclTime now, const Endpoint& ep) { do_on_connect(now, ep); }
void on_disconnect(CyclTime now, const Endpoint& ep) noexcept { do_on_disconnect(now, ep); }
void on_error(CyclTime now, const Endpoint& ep, const std::exception& e,
HttpStream& os) noexcept
void on_http_connect(CyclTime now, const Endpoint& ep) { do_on_http_connect(now, ep); }
void on_http_disconnect(CyclTime now, const Endpoint& ep) noexcept
{
do_on_error(now, ep, e, os);
do_on_http_disconnect(now, ep);
}
void on_message(CyclTime now, const Endpoint& ep, const HttpRequest& req, HttpStream& os)
void on_http_error(CyclTime now, const Endpoint& ep, const std::exception& e,
HttpStream& os) noexcept
{
do_on_message(now, ep, req, os);
do_on_http_error(now, ep, e, os);
}
void on_timeout(CyclTime now, const Endpoint& ep) noexcept { do_on_timeout(now, ep); }
void on_http_message(CyclTime now, const Endpoint& ep, const HttpRequest& req, HttpStream& os)
{
do_on_http_message(now, ep, req, os);
}
void on_http_timeout(CyclTime now, const Endpoint& ep) noexcept { do_on_http_timeout(now, ep); }

protected:
virtual void do_on_connect(CyclTime now, const Endpoint& ep) = 0;
virtual void do_on_disconnect(CyclTime now, const Endpoint& ep) noexcept = 0;
virtual void do_on_error(CyclTime now, const Endpoint& ep, const std::exception& e,
HttpStream& os) noexcept
virtual void do_on_http_connect(CyclTime now, const Endpoint& ep) = 0;
virtual void do_on_http_disconnect(CyclTime now, const Endpoint& ep) noexcept = 0;
virtual void do_on_http_error(CyclTime now, const Endpoint& ep, const std::exception& e,
HttpStream& os) noexcept
= 0;
virtual void do_on_message(CyclTime now, const Endpoint& ep, const HttpRequest& req,
HttpStream& os)
virtual void do_on_http_message(CyclTime now, const Endpoint& ep, const HttpRequest& req,
HttpStream& os)
= 0;
virtual void do_on_timeout(CyclTime now, const Endpoint& ep) noexcept = 0;
virtual void do_on_http_timeout(CyclTime now, const Endpoint& ep) noexcept = 0;
};

} // namespace http
Expand Down
24 changes: 12 additions & 12 deletions toolbox/http/Conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BasicHttpConn
sub_ = r.subscribe(*sock_, EventIn, bind<&BasicHttpConn::on_io_event>(this));
tmr_ = r.timer(now.mono_time() + IdleTimeout, Priority::Low,
bind<&BasicHttpConn::on_timer>(this));
app.on_connect(now, ep_);
app.on_http_connect(now, ep_);
}

// Copy.
Expand All @@ -79,7 +79,7 @@ class BasicHttpConn
void clear() noexcept { req_.clear(); }
void dispose(CyclTime now) noexcept
{
app_.on_disconnect(now, ep_);
app_.on_http_disconnect(now, ep_);
delete this;
}

Expand All @@ -99,7 +99,7 @@ class BasicHttpConn
req_.append_url(sv);
ret = true;
} catch (const std::exception& e) {
app_.on_error(now, ep_, e, os_);
app_.on_http_error(now, ep_, e, os_);
flush_and_dispose(now);
}
return ret;
Expand All @@ -116,7 +116,7 @@ class BasicHttpConn
req_.append_header_field(sv, first);
ret = true;
} catch (const std::exception& e) {
app_.on_error(now, ep_, e, os_);
app_.on_http_error(now, ep_, e, os_);
flush_and_dispose(now);
}
return ret;
Expand All @@ -128,7 +128,7 @@ class BasicHttpConn
req_.append_header_value(sv, first);
ret = true;
} catch (const std::exception& e) {
app_.on_error(now, ep_, e, os_);
app_.on_http_error(now, ep_, e, os_);
flush_and_dispose(now);
}
return ret;
Expand All @@ -145,7 +145,7 @@ class BasicHttpConn
req_.append_body(sv);
ret = true;
} catch (const std::exception& e) {
app_.on_error(now, ep_, e, os_);
app_.on_http_error(now, ep_, e, os_);
flush_and_dispose(now);
}
return ret;
Expand All @@ -158,15 +158,15 @@ class BasicHttpConn
req_.flush(); // May throw.

const auto was_empty = out_.empty();
app_.on_message(now, ep_, req_, os_);
app_.on_http_message(now, ep_, req_, os_);

if (was_empty) {
// May throw.
sub_.set_events(EventIn | EventOut);
}
ret = true;
} catch (const std::exception& e) {
app_.on_error(now, ep_, e, os_);
app_.on_http_error(now, ep_, e, os_);
flush_and_dispose(now);
}
req_.clear();
Expand Down Expand Up @@ -204,16 +204,16 @@ class BasicHttpConn
}
}
} catch (const HttpException&) {
// Do not call on_error() here, because it will have already been called in one of the
// noexcept parser callback functions.
// Do not call on_http_error() here, because it will have already been called in one of
// the noexcept parser callback functions.
} catch (const std::exception& e) {
app_.on_error(now, ep_, e, os_);
app_.on_http_error(now, ep_, e, os_);
flush_and_dispose(now);
}
}
void on_timer(CyclTime now, Timer& tmr)
{
app_.on_timeout(now, ep_);
app_.on_http_timeout(now, ep_);
dispose(now);
}
void flush_and_dispose(CyclTime now) noexcept
Expand Down
3 changes: 2 additions & 1 deletion toolbox/http/Serv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class BasicHttpServ : public StreamAcceptor<BasicHttpServ<ConnT, AppT>> {
BasicHttpServ& operator=(BasicHttpServ&&) = delete;

private:
void do_accept(CyclTime now, IoSock&& sock, const Endpoint& ep)
void on_sock_init(CyclTime now, IoSock& sock) {}
void on_sock_accept(CyclTime now, IoSock&& sock, const Endpoint& ep)
{
auto* const conn = new Conn{now, reactor_, std::move(sock), ep, app_};
conn_list_.push_back(*conn);
Expand Down
72 changes: 58 additions & 14 deletions toolbox/net/Socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,16 +647,36 @@ inline std::error_code get_so_error(int sockfd)
return make_sys_error(optval);
}

inline void set_so_reuse_addr(int sockfd, bool enabled, std::error_code& ec) noexcept
inline int get_so_rcv_buf(int sockfd, std::error_code& ec) noexcept
{
int optval{enabled ? 1 : 0};
os::setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval), ec);
int optval{};
socklen_t optlen{sizeof(optval)};
os::getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, optlen, ec);
return optval;
}

inline void set_so_reuse_addr(int sockfd, bool enabled)
inline int get_so_rcv_buf(int sockfd)
{
int optval{enabled ? 1 : 0};
os::setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
int optval{};
socklen_t optlen{sizeof(optval)};
os::getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, optlen);
return optval;
}

inline int get_so_snd_buf(int sockfd, std::error_code& ec) noexcept
{
int optval{};
socklen_t optlen{sizeof(optval)};
os::getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen, ec);
return optval;
}

inline int get_so_snd_buf(int sockfd)
{
int optval{};
socklen_t optlen{sizeof(optval)};
os::getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
return optval;
}

inline void set_so_rcv_buf(int sockfd, int size, std::error_code& ec) noexcept
Expand All @@ -669,6 +689,18 @@ inline void set_so_rcv_buf(int sockfd, int size)
os::setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
}

inline void set_so_reuse_addr(int sockfd, bool enabled, std::error_code& ec) noexcept
{
int optval{enabled ? 1 : 0};
os::setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval), ec);
}

inline void set_so_reuse_addr(int sockfd, bool enabled)
{
int optval{enabled ? 1 : 0};
os::setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}

inline void set_so_snd_buf(int sockfd, int size, std::error_code& ec) noexcept
{
os::setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size), ec);
Expand Down Expand Up @@ -706,34 +738,46 @@ struct Sock {
bool is_ip_family() const noexcept { return family_ == AF_INET || family_ == AF_INET6; }

// Logically const.
std::error_code get_so_error(std::error_code& ec) const
std::error_code get_error(std::error_code& ec) const noexcept
{
return toolbox::get_so_error(*sock_, ec);
}
std::error_code get_so_error() const { return toolbox::get_so_error(*sock_); }
std::error_code get_error() const { return toolbox::get_so_error(*sock_); }

int get_rcv_buf(std::error_code& ec) const noexcept
{
return toolbox::get_so_rcv_buf(*sock_, ec);
}
int get_rcv_buf() const { return toolbox::get_so_rcv_buf(*sock_); }

int get_snd_buf(std::error_code& ec) const noexcept
{
return toolbox::get_so_snd_buf(*sock_, ec);
}
int get_snd_buf() const { return toolbox::get_so_snd_buf(*sock_); }

void close() { sock_.reset(); }

void set_non_block(std::error_code& ec) noexcept { toolbox::set_non_block(*sock_, ec); }
void set_non_block() { toolbox::set_non_block(*sock_); }

void set_so_rcv_buf(int size, std::error_code& ec) noexcept
void set_rcv_buf(int size, std::error_code& ec) noexcept
{
toolbox::set_so_rcv_buf(*sock_, size, ec);
}
void set_so_rcv_buf(int size) { toolbox::set_so_rcv_buf(*sock_, size); }
void set_rcv_buf(int size) { toolbox::set_so_rcv_buf(*sock_, size); }

void set_so_reuse_addr(bool enabled, std::error_code& ec) noexcept
void set_reuse_addr(bool enabled, std::error_code& ec) noexcept
{
toolbox::set_so_reuse_addr(*sock_, enabled, ec);
}
void set_so_reuse_addr(bool enabled) { toolbox::set_so_reuse_addr(*sock_, enabled); }
void set_reuse_addr(bool enabled) { toolbox::set_so_reuse_addr(*sock_, enabled); }

void set_so_snd_buf(int size, std::error_code& ec) noexcept
void set_snd_buf(int size, std::error_code& ec) noexcept
{
toolbox::set_so_snd_buf(*sock_, size, ec);
}
void set_so_snd_buf(int size) { toolbox::set_so_snd_buf(*sock_, size); }
void set_snd_buf(int size) { toolbox::set_so_snd_buf(*sock_, size); }

protected:
FileHandle sock_;
Expand Down
9 changes: 7 additions & 2 deletions toolbox/net/StreamAcceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class StreamAcceptor {
StreamAcceptor(Reactor& r, const Endpoint& ep)
: serv_{ep.protocol()}
{
serv_.set_so_reuse_addr(true);
serv_.set_reuse_addr(true);
serv_.bind(ep);
serv_.listen(SOMAXCONN);
sub_ = r.subscribe(*serv_, EventIn, bind<&StreamAcceptor::on_io_event>(this));
Expand All @@ -55,7 +55,12 @@ class StreamAcceptor {
{
Endpoint ep;
IoSock sock{os::accept(fd, ep), serv_.family()};
static_cast<DerivedT*>(this)->do_accept(now, std::move(sock), ep);
static_cast<DerivedT*>(this)->on_sock_init(now, sock);
sock.set_non_block();
if (sock.is_ip_family()) {
set_tcp_no_delay(sock.get(), true);
}
static_cast<DerivedT*>(this)->on_sock_accept(now, std::move(sock), ep);
}

StreamSockServ serv_;
Expand Down
Loading

0 comments on commit abbe574

Please sign in to comment.